diff options
-rw-r--r-- | src/modules/extra/m_mysql.cpp | 87 |
1 files changed, 81 insertions, 6 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp index 844e74c30..a6105c33d 100644 --- a/src/modules/extra/m_mysql.cpp +++ b/src/modules/extra/m_mysql.cpp @@ -41,6 +41,7 @@ typedef std::map<std::string, SQLConnection*> ConnMap; bool giveup = false; static Module* SQLModule = NULL; static Notifier* MessagePipe = NULL; +int QueueFD = -1; #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224 @@ -571,7 +572,8 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv) void NotifyMainThread(SQLConnection* connection_with_new_result) { - /* Here we connect() to the socket the main thread has open. + /* Here we write() to the socket the main thread has open + * and we connect()ed back to before our thread became active. * The main thread is using a nonblocking socket tied into * the socket engine, so they wont block and they'll receive * nearly instant notification. Because we're in a seperate @@ -580,6 +582,8 @@ void NotifyMainThread(SQLConnection* connection_with_new_result) * connection back. */ log(DEBUG,"Notify of result on connection: %s",connection_with_new_result->GetID().c_str()); + write(QueueFD, connection_with_new_result->GetID().c_str(), connection_with_new_result->GetID().length()+1); // add one for null terminator + log(DEBUG,"Sent it on its way via fd=%d",QueueFD); } void* DispatcherThread(void* arg); @@ -588,20 +592,64 @@ class Notifier : public InspSocket { sockaddr_in sock_us; socklen_t uslen; + Server* Srv; public: - Notifier() : InspSocket("127.0.0.1", 0, true, 3000) + /* Create a socket on a random port. Let the tcp stack allocate us an available port */ + Notifier(Server* S) : InspSocket("127.0.0.1", 0, true, 3000), Srv(S) { + uslen = sizeof(sock_us); if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) { throw ModuleException("Could not create random listening port on localhost"); } } - std::string GetPort() + Notifier(int newfd, char* ip, Server* S) : Srv(S) { - return ConvToStr(ntohs(sock_us.sin_port)); + log(DEBUG,"Constructor of new socket"); + } + + /* Using getsockname and ntohs, we can determine which port number we were allocated */ + int GetPort() + { + return ntohs(sock_us.sin_port); + } + + virtual int OnIncomingConnection(int newsock, char* ip) + { + log(DEBUG,"Inbound connection!"); + Notifier* n = new Notifier(newsock, ip, Srv); + Srv->AddSocket(n); + return true; + } + + virtual bool OnDataReady() + { + log(DEBUG,"Inbound data!"); + char* data = this->Read(); + ConnMap::iterator iter; + + if (data && *data) + { + log(DEBUG,"Looking for connection %s",data); + /* We expect to be sent a null terminated string */ + if((iter = Connections.find(data)) != Connections.end()) + { + log(DEBUG,"Found it!"); + + /* Lock the mutex, send back the data */ + pthread_mutex_lock(&results_mutex); + ResultQueue::iterator n = iter->second->rq.begin(); + (*n)->Send(); + iter->second->rq.pop_front(); + pthread_mutex_unlock(&results_mutex); + return true; + } + } + + return false; } }; @@ -670,8 +718,9 @@ class ModuleSQL : public Module currid = 0; SQLModule = this; - MessagePipe = new Notifier(); - log(DEBUG,"Bound notifier to 127.0.0.1:%s",MessagePipe->GetPort().c_str()); + MessagePipe = new Notifier(Srv); + Srv->AddSocket(MessagePipe); + log(DEBUG,"Bound notifier to 127.0.0.1:%d",MessagePipe->GetPort()); pthread_attr_t attribs; pthread_attr_init(&attribs); @@ -706,6 +755,31 @@ void* DispatcherThread(void* arg) ModuleSQL* thismodule = (ModuleSQL*)arg; LoadDatabases(thismodule->Conf, thismodule->Srv); + /* Connect back to the Notifier */ + + if ((QueueFD = socket(AF_INET, SOCK_STREAM, 0)) == -1) + { + /* crap, we're out of sockets... */ + log(DEBUG,"QueueFD cant be created"); + return NULL; + } + + log(DEBUG,"Initialize QueueFD to %d",QueueFD); + + sockaddr_in addr; + in_addr ia; + inet_aton("127.0.0.1", &ia); + addr.sin_family = AF_INET; + addr.sin_addr = ia; + addr.sin_port = htons(MessagePipe->GetPort()); + + if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1) + { + /* wtf, we cant connect to it, but we just created it! */ + log(DEBUG,"QueueFD cant connect!"); + return NULL; + } + while (!giveup) { SQLConnection* conn = NULL; @@ -725,6 +799,7 @@ void* DispatcherThread(void* arg) /* Theres an item! */ if (conn) { + log(DEBUG,"Process Leading query"); conn->DoLeadingQuery(); /* XXX: Lock */ |