X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=src%2Fmodules%2Fextra%2Fm_mssql.cpp;h=43d546819fb26200c6dd58b5bab9e4c254d088ca;hb=7e843c22e16c81054bad18073d24fe1a07026431;hp=65f86e9aca3b448e81f09ce6c5a9231b710f7840;hpb=4ed72f3744b1f78251d66c9556695f6328a3bee0;p=user%2Fhenk%2Fcode%2Finspircd.git diff --git a/src/modules/extra/m_mssql.cpp b/src/modules/extra/m_mssql.cpp index 65f86e9ac..43d546819 100644 --- a/src/modules/extra/m_mssql.cpp +++ b/src/modules/extra/m_mssql.cpp @@ -3,7 +3,7 @@ * +------------------------------------+ * * InspIRCd: (C) 2002-2009 InspIRCd Development Team - * See: http://www.inspircd.org/wiki/index.php/Credits + * See: http://wiki.inspircd.org/Credits * * This program is free but copyrighted software; see * the file COPYING for details. @@ -27,8 +27,6 @@ class SQLConn; class MsSQLResult; -class ResultNotifier; -class MsSQLListener; class ModuleMsSQL; typedef std::map ConnMap; @@ -37,96 +35,29 @@ typedef std::deque ResultQueue; unsigned long count(const char * const str, char a) { unsigned long n = 0; - const char *p = reinterpret_cast(str); - - while ((p = strchr(p, a)) != NULL) + for (const char *p = str; *p; ++p) { - ++p; - ++n; + if (*p == '?') + ++n; } return n; } -ResultNotifier* notifier = NULL; -MsSQLListener* listener = NULL; -int QueueFD = -1; - ConnMap connections; -Mutex* QueueMutex; Mutex* ResultsMutex; Mutex* LoggingMutex; -class QueryThread : public Thread +class QueryThread : public SocketThread { private: - ModuleMsSQL* Parent; - InspIRCd* ServerInstance; + ModuleMsSQL* const Parent; public: - QueryThread(InspIRCd* si, ModuleMsSQL* mod) - : Thread(), Parent(mod), ServerInstance(si) - { - } + QueryThread(ModuleMsSQL* mod) : Parent(mod) { } ~QueryThread() { } virtual void Run(); + virtual void OnNotify(); }; -class ResultNotifier : public BufferedSocket -{ - ModuleMsSQL* mod; - - public: - ResultNotifier(ModuleMsSQL* m, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), mod(m) - { - } - - virtual bool OnDataReady() - { - char data = 0; - if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0) - { - Dispatch(); - return true; - } - return false; - } - - void Dispatch(); -}; - -class MsSQLListener : public ListenSocketBase -{ - ModuleMsSQL* Parent; - irc::sockets::insp_sockaddr sock_us; - socklen_t uslen; - FileReader* index; - - public: - MsSQLListener(ModuleMsSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P) - { - uslen = sizeof(sock_us); - if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) - { - throw ModuleException("Could not getsockname() to find out port number for ITC port"); - } - } - - virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip) - { - new ResultNotifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str()); // XXX unsafe casts suck - } - - /* Using getsockname and ntohs, we can determine which port number we were allocated */ - int GetPort() - { -#ifdef IPV6 - return ntohs(sock_us.sin6_port); -#else - return ntohs(sock_us.sin_port); -#endif - } -}; - - class MsSQLResult : public SQLresult { private: @@ -291,15 +222,12 @@ class MsSQLResult : public SQLresult { delete fl; } - - }; class SQLConn : public classbase { private: ResultQueue results; - InspIRCd* ServerInstance; Module* mod; SQLhost host; TDSLOGIN* login; @@ -309,8 +237,8 @@ class SQLConn : public classbase public: QueryQueue queue; - SQLConn(InspIRCd* SI, Module* m, const SQLhost& hi) - : ServerInstance(SI), mod(m), host(hi), login(NULL), sock(NULL), context(NULL) + SQLConn(Module* m, const SQLhost& hi) + : mod(m), host(hi), login(NULL), sock(NULL), context(NULL) { if (OpenDB()) { @@ -361,9 +289,6 @@ class SQLConn : public classbase /* Total length of the unescaped parameters */ unsigned long maxparamlen, paramcount; - /* Total length of query, used for binary-safety */ - unsigned long querylength = 0; - /* The length of the longest parameter */ maxparamlen = 0; @@ -488,7 +413,6 @@ class SQLConn : public classbase *queryend = req.query.q[i]; queryend++; } - querylength++; } *queryend = 0; req.query.q = query; @@ -575,7 +499,6 @@ class SQLConn : public classbase ResultsMutex->Lock(); results.push_back(res); ResultsMutex->Unlock(); - SendNotify(); return SQLerror(); } @@ -583,7 +506,7 @@ class SQLConn : public classbase { SQLConn* sc = (SQLConn*)pContext->parent; LoggingMutex->Lock(); - sc->ServerInstance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); + ServerInstance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); LoggingMutex->Unlock(); return 0; } @@ -592,7 +515,7 @@ class SQLConn : public classbase { SQLConn* sc = (SQLConn*)pContext->parent; LoggingMutex->Lock(); - sc->ServerInstance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); + ServerInstance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); LoggingMutex->Unlock(); return 0; } @@ -699,40 +622,6 @@ class SQLConn : public classbase } } - void SendNotify() - { - if (QueueFD < 0) - { - if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1) - { - /* crap, we're out of sockets... */ - return; - } - - irc::sockets::insp_sockaddr addr; - -#ifdef IPV6 - irc::sockets::insp_aton("::1", &addr.sin6_addr); - addr.sin6_family = AF_FAMILY; - addr.sin6_port = htons(listener->GetPort()); -#else - irc::sockets::insp_inaddr ia; - irc::sockets::insp_aton("127.0.0.1", &ia); - addr.sin_family = AF_FAMILY; - addr.sin_addr = ia; - addr.sin_port = htons(listener->GetPort()); -#endif - - if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1) - { - /* wtf, we cant connect to it, but we just created it! */ - return; - } - } - char id = 0; - send(QueueFD, &id, 1, 0); - } - void DoLeadingQuery() { SQLrequest& req = queue.front(); @@ -749,12 +638,11 @@ class ModuleMsSQL : public Module QueryThread* queryDispatcher; public: - ModuleMsSQL(InspIRCd* Me) - : Module(Me), currid(0) + ModuleMsSQL() + : currid(0) { - LoggingMutex = ServerInstance->Mutexes->CreateMutex(); - ResultsMutex = ServerInstance->Mutexes->CreateMutex(); - QueueMutex = ServerInstance->Mutexes->CreateMutex(); + LoggingMutex = new Mutex(); + ResultsMutex = new Mutex(); ServerInstance->Modules->UseInterface("SQLutils"); @@ -763,29 +651,10 @@ class ModuleMsSQL : public Module throw ModuleException("m_mssql: Unable to publish feature 'SQL'"); } - /* Create a socket on a random port. Let the tcp stack allocate us an available port */ -#ifdef IPV6 - listener = new MsSQLListener(this, ServerInstance, 0, "::1"); -#else - listener = new MsSQLListener(this, ServerInstance, 0, "127.0.0.1"); -#endif - - if (listener->GetFd() == -1) - { - ServerInstance->Modules->DoneWithInterface("SQLutils"); - throw ModuleException("m_mssql: unable to create ITC pipe"); - } - else - { - LoggingMutex->Lock(); - ServerInstance->Logs->Log("m_mssql", DEBUG, "MsSQL: Interthread comms port is %d", listener->GetPort()); - LoggingMutex->Unlock(); - } - ReadConf(); - queryDispatcher = new QueryThread(ServerInstance, this); - ServerInstance->Threads->Create(queryDispatcher); + queryDispatcher = new QueryThread(this); + ServerInstance->Threads->Start(queryDispatcher); ServerInstance->Modules->PublishInterface("SQL", this); Implementation eventlist[] = { I_OnRequest, I_OnRehash }; @@ -794,36 +663,19 @@ class ModuleMsSQL : public Module virtual ~ModuleMsSQL() { + queryDispatcher->join(); delete queryDispatcher; ClearQueue(); ClearAllConnections(); - ServerInstance->SE->DelFd(listener); - ServerInstance->BufferedSocketCull(); - - if (QueueFD >= 0) - { - shutdown(QueueFD, 2); - close(QueueFD); - } - - if (notifier) - { - ServerInstance->SE->DelFd(notifier); - notifier->Close(); - ServerInstance->BufferedSocketCull(); - } - ServerInstance->Modules->UnpublishInterface("SQL", this); ServerInstance->Modules->UnpublishFeature("SQL"); ServerInstance->Modules->DoneWithInterface("SQLutils"); delete LoggingMutex; delete ResultsMutex; - delete QueueMutex; } - void SendQueue() { for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++) @@ -852,7 +704,7 @@ class ModuleMsSQL : public Module bool HostInConf(const SQLhost &h) { - ConfigReader conf(ServerInstance); + ConfigReader conf; for(int i = 0; i < conf.Enumerate("database"); i++) { SQLhost host; @@ -872,7 +724,7 @@ class ModuleMsSQL : public Module { ClearOldConnections(); - ConfigReader conf(ServerInstance); + ConfigReader conf; for(int i = 0; i < conf.Enumerate("database"); i++) { SQLhost host; @@ -903,7 +755,7 @@ class ModuleMsSQL : public Module SQLConn* newconn; - newconn = new SQLConn(ServerInstance, this, hi); + newconn = new SQLConn(this, hi); connections.insert(std::make_pair(hi.id, newconn)); } @@ -933,11 +785,11 @@ class ModuleMsSQL : public Module } } - virtual void OnRehash(User* user, const std::string ¶meter) + virtual void OnRehash(User* user) { - QueueMutex->Lock(); + queryDispatcher->LockQueue(); ReadConf(); - QueueMutex->Unlock(); + queryDispatcher->UnlockQueueWakeup(); } virtual const char* OnRequest(Request* request) @@ -946,7 +798,7 @@ class ModuleMsSQL : public Module { SQLrequest* req = (SQLrequest*)request; - QueueMutex->Lock(); + queryDispatcher->LockQueue(); ConnMap::iterator iter; @@ -963,7 +815,7 @@ class ModuleMsSQL : public Module req->error.Id(SQL_BAD_DBID); } - QueueMutex->Unlock(); + queryDispatcher->UnlockQueueWakeup(); return returnval; } @@ -980,22 +832,22 @@ class ModuleMsSQL : public Module virtual Version GetVersion() { - return Version("$Id$", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION); + return Version("MsSQL provider", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION); } }; -void ResultNotifier::Dispatch() +void QueryThread::OnNotify() { - mod->SendQueue(); + Parent->SendQueue(); } void QueryThread::Run() { + this->LockQueue(); while (this->GetExitFlag() == false) { SQLConn* conn = NULL; - QueueMutex->Lock(); for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++) { if (i->second->queue.totalsize()) @@ -1004,16 +856,20 @@ void QueryThread::Run() break; } } - QueueMutex->Unlock(); if (conn) { + this->UnlockQueue(); conn->DoLeadingQuery(); - QueueMutex->Lock(); + this->NotifyParent(); + this->LockQueue(); conn->queue.pop(); - QueueMutex->Unlock(); } - usleep(1000); + else + { + this->WaitForQueue(); + } } + this->UnlockQueue(); } MODULE_INIT(ModuleMsSQL)