X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmodules%2Fextra%2Fm_mysql.cpp;h=ae66e44bb5377eb7ed26b535e3c07adcbc07c004;hb=b6497f2715d1df8ccc1461caa52ec600490e44b3;hp=8b9ad0666ec97012a66f0151d3eb9c80b9d9b201;hpb=bc9aa036ce738f931f26d195adc7d49b04868b6e;p=user%2Fhenk%2Fcode%2Finspircd.git diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp index 8b9ad0666..ae66e44bb 100644 --- a/src/modules/extra/m_mysql.cpp +++ b/src/modules/extra/m_mysql.cpp @@ -68,11 +68,11 @@ class SQLConnection; -class Notifier; +class MySQLListener; typedef std::map ConnMap; -static Notifier* MessagePipe = NULL; +static MySQLListener *MessagePipe = NULL; int QueueFD = -1; class DispatcherThread; @@ -403,9 +403,9 @@ class SQLConnection : public classbase *queryend = 0; - Parent->QueueMutex->Enable(true); + Parent->QueueMutex->Lock(); req.query.q = query; - Parent->QueueMutex->Enable(false); + Parent->QueueMutex->Unlock(); if (!mysql_real_query(&connection, req.query.q.data(), req.query.q.length())) { @@ -418,22 +418,22 @@ class SQLConnection : public classbase /* Put this new result onto the results queue. * XXX: Remember to mutex the queue! */ - Parent->ResultsMutex->Enable(true); + Parent->ResultsMutex->Lock(); rq.push_back(r); - Parent->ResultsMutex->Enable(false); + Parent->ResultsMutex->Unlock(); } else { /* XXX: See /usr/include/mysql/mysqld_error.h for a list of * possible error numbers and error messages */ - SQLerror e(QREPLY_FAIL, ConvToStr(mysql_errno(&connection)) + std::string(": ") + mysql_error(&connection)); + SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(&connection)) + std::string(": ") + mysql_error(&connection)); MySQLresult* r = new MySQLresult(Parent, req.GetSource(), e, req.id); r->dbid = this->GetID(); r->query = req.query.q; - Parent->ResultsMutex->Enable(true); + Parent->ResultsMutex->Lock(); rq.push_back(r); - Parent->ResultsMutex->Enable(false); + Parent->ResultsMutex->Unlock(); } /* Now signal the main thread that we've got a result to process. @@ -564,10 +564,10 @@ void ConnectDatabases(InspIRCd* ServerInstance, ModuleSQL* Parent) if (!i->second->Connect()) { /* XXX: MUTEX */ - Parent->LoggingMutex->Enable(true); + Parent->LoggingMutex->Lock(); ServerInstance->Logs->Log("m_mysql",DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError()); i->second->SetEnable(false); - Parent->LoggingMutex->Enable(false); + Parent->LoggingMutex->Unlock(); } } } @@ -661,46 +661,10 @@ class DispatcherThread : public Thread */ class Notifier : public BufferedSocket { - insp_sockaddr sock_us; - socklen_t uslen; ModuleSQL* Parent; public: - - /* Create a socket on a random port. Let the tcp stack allocate us an available port */ -#ifdef IPV6 - Notifier(InspIRCd* SI, ModuleSQL* Creator) : BufferedSocket(SI, "::1", 0, true, 3000), Parent(Creator) -#else - Notifier(InspIRCd* SI, ModuleSQL* Creator) : BufferedSocket(SI, "127.0.0.1", 0, true, 3000), Parent(Creator) -#endif - { - uslen = sizeof(sock_us); - if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) - { - throw ModuleException("Could not create random listening port on localhost"); - } - } - - Notifier(InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip) - { - } - - /* Using getsockname and ntohs, we can determine which port number we were allocated */ - int GetPort() - { -#ifdef IPV6 - return ntohs(sock_us.sin6_port); -#else - return ntohs(sock_us.sin_port); -#endif - } - - virtual int OnIncomingConnection(int newsock, char* ip) - { - Notifier* n = new Notifier(this->Instance, newsock, ip); - n = n; /* Stop bitching at me, GCC */ - return true; - } + Notifier(InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip) { } virtual bool OnDataReady() { @@ -717,12 +681,12 @@ class Notifier : public BufferedSocket if (iter != Connections.end()) { /* Lock the mutex, send back the data */ - Parent->ResultsMutex->Enable(true); + Parent->ResultsMutex->Lock(); ResultQueue::iterator n = iter->second->rq.begin(); (*n)->Send(); delete (*n); iter->second->rq.pop_front(); - Parent->ResultsMutex->Enable(false); + Parent->ResultsMutex->Unlock(); return true; } /* No error, but unknown id */ @@ -734,8 +698,41 @@ class Notifier : public BufferedSocket } }; +/** Spawn sockets from a listener + */ +class MySQLListener : public ListenSocketBase +{ + insp_sockaddr sock_us; + socklen_t uslen; + FileReader* index; + + public: + MySQLListener(InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr) + { + uslen = sizeof(sock_us); + if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) + { + throw ModuleException("Could not getsockname() to find out port number for ITC port"); + } + } + + virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip) + { + new Notifier(this->ServerInstance, nfd, (char *)ipconnectedto.c_str()); // XXX unsafe casts suck + } + + /* Using getsockname and ntohs, we can determine which port number we were allocated */ + int GetPort() + { +#ifdef IPV6 + return ntohs(sock_us.sin6_port); +#else + return ntohs(sock_us.sin_port); +#endif + } +}; -ModuleSQL::ModuleSQL(InspIRCd* Me) : Module::Module(Me), rehashing(false) +ModuleSQL::ModuleSQL(InspIRCd* Me) : Module(Me), rehashing(false) { ServerInstance->Modules->UseInterface("SQLutils"); @@ -743,7 +740,17 @@ ModuleSQL::ModuleSQL(InspIRCd* Me) : Module::Module(Me), rehashing(false) PublicServerInstance = ServerInstance; currid = 0; - MessagePipe = new Notifier(ServerInstance, this); + /* Create a socket on a random port. Let the tcp stack allocate us an available port */ +#ifdef IPV6 + MessagePipe = new MySQLListener(ServerInstance, 0, "::1"); +#else + MessagePipe = new MySQLListener(ServerInstance, 0, "127.0.0.1"); +#endif + + if (MessagePipe->GetFd() == -1) + throw ModuleException("m_mysql: unable to create ITC pipe"); + else + ServerInstance->Logs->Log("m_mysql", DEBUG, "MySQL: Interthread comms port is %d", MessagePipe->GetPort()); Dispatcher = new DispatcherThread(ServerInstance, this); ServerInstance->Threads->Create(Dispatcher); @@ -792,7 +799,7 @@ const char* ModuleSQL::OnRequest(Request* request) SQLrequest* req = (SQLrequest*)request; /* XXX: Lock */ - QueueMutex->Enable(true); + QueueMutex->Lock(); ConnMap::iterator iter; @@ -806,10 +813,10 @@ const char* ModuleSQL::OnRequest(Request* request) } else { - req->error.Id(BAD_DBID); + req->error.Id(SQL_BAD_DBID); } - QueueMutex->Enable(false); + QueueMutex->Unlock(); /* XXX: Unlock */ return returnval; @@ -865,16 +872,16 @@ void DispatcherThread::Run() if (Parent->rehashing) { /* XXX: Lock */ - Parent->QueueMutex->Enable(true); + Parent->QueueMutex->Lock(); Parent->rehashing = false; LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent); - Parent->QueueMutex->Enable(false); + Parent->QueueMutex->Unlock(); /* XXX: Unlock */ } SQLConnection* conn = NULL; /* XXX: Lock here for safety */ - Parent->QueueMutex->Enable(true); + Parent->QueueMutex->Lock(); for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) { if (i->second->queue.totalsize()) @@ -883,7 +890,7 @@ void DispatcherThread::Run() break; } } - Parent->QueueMutex->Enable(false); + Parent->QueueMutex->Unlock(); /* XXX: Unlock */ /* Theres an item! */ @@ -892,9 +899,9 @@ void DispatcherThread::Run() conn->DoLeadingQuery(); /* XXX: Lock */ - Parent->QueueMutex->Enable(true); + Parent->QueueMutex->Lock(); conn->queue.pop(); - Parent->QueueMutex->Enable(false); + Parent->QueueMutex->Unlock(); /* XXX: Unlock */ }