From 9ad665b632bddaeb1d5fc055087504aef760af9f Mon Sep 17 00:00:00 2001 From: brain Date: Thu, 4 Sep 2008 21:39:24 +0000 Subject: Now uses Mutex class. No need for pthreads use directly in this lib. Needs testing. git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@10388 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/modules/extra/m_mysql.cpp | 242 +++++++++++++++++++++--------------------- 1 file changed, 121 insertions(+), 121 deletions(-) (limited to 'src/modules/extra') diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp index 121566d9d..1a3e17ed8 100644 --- a/src/modules/extra/m_mysql.cpp +++ b/src/modules/extra/m_mysql.cpp @@ -6,7 +6,7 @@ * See: http://www.inspircd.org/wiki/index.php/Credits * * This program is free but copyrighted software; see - * the file COPYING for details. + * the file COPYING for details. * * --------------------------------------------------- */ @@ -16,7 +16,6 @@ #include "inspircd.h" #include -#include #include "users.h" #include "channels.h" #include "modules.h" @@ -69,12 +68,35 @@ class Notifier; typedef std::map ConnMap; -bool giveup = false; -bool threadfinished = false; -static Module* SQLModule = NULL; static Notifier* MessagePipe = NULL; int QueueFD = -1; +class DispatcherThread; + +/** MySQL module + * */ +class ModuleSQL : public Module +{ + public: + + ConfigReader *Conf; + InspIRCd* PublicServerInstance; + int currid; + bool rehashing; + DispatcherThread* Dispatcher; + Mutex* QueueMutex; + Mutex* ResultsMutex; + Mutex* LoggingMutex; + + ModuleSQL(InspIRCd* Me); + ~ModuleSQL(); + unsigned long NewID(); + const char* OnRequest(Request* request); + void OnRehash(User* user, const std::string ¶meter); + Version GetVersion(); +}; + + #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224 #define mysql_field_count mysql_num_fields @@ -82,13 +104,6 @@ int QueueFD = -1; typedef std::deque ResultQueue; -/* A mutex to wrap around queue accesses */ -pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_mutex_t results_mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_mutex_t logging_mutex = PTHREAD_MUTEX_INITIALIZER; - /** Represents a mysql result set */ class MySQLresult : public SQLresult @@ -287,6 +302,7 @@ class SQLConnection : public classbase SQLhost host; std::map thisrow; bool Enabled; + ModuleSQL* Parent; public: @@ -294,7 +310,7 @@ class SQLConnection : public classbase ResultQueue rq; // This constructor creates an SQLConnection object with the given credentials, but does not connect yet. - SQLConnection(const SQLhost &hi) : host(hi), Enabled(false) + SQLConnection(const SQLhost &hi, ModuleSQL* Creator) : host(hi), Enabled(false), Parent(Creator) { } @@ -383,37 +399,37 @@ class SQLConnection : public classbase *queryend = 0; - pthread_mutex_lock(&queue_mutex); + Parent->QueueMutex->Enable(true); req.query.q = query; - pthread_mutex_unlock(&queue_mutex); + Parent->QueueMutex->Enable(false); if (!mysql_real_query(&connection, req.query.q.data(), req.query.q.length())) { /* Successfull query */ res = mysql_use_result(&connection); unsigned long rows = mysql_affected_rows(&connection); - MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), res, rows, req.id); + MySQLresult* r = new MySQLresult(Parent, req.GetSource(), res, rows, req.id); r->dbid = this->GetID(); r->query = req.query.q; /* Put this new result onto the results queue. * XXX: Remember to mutex the queue! */ - pthread_mutex_lock(&results_mutex); + Parent->ResultsMutex->Enable(true); rq.push_back(r); - pthread_mutex_unlock(&results_mutex); + Parent->ResultsMutex->Enable(false); } else { /* XXX: See /usr/include/mysql/mysqld_error.h for a list of * possible error numbers and error messages */ SQLerror e(QREPLY_FAIL, ConvToStr(mysql_errno(&connection)) + std::string(": ") + mysql_error(&connection)); - MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), e, req.id); + MySQLresult* r = new MySQLresult(Parent, req.GetSource(), e, req.id); r->dbid = this->GetID(); r->query = req.query.q; - pthread_mutex_lock(&results_mutex); + Parent->ResultsMutex->Enable(true); rq.push_back(r); - pthread_mutex_unlock(&results_mutex); + Parent->ResultsMutex->Enable(false); } /* Now signal the main thread that we've got a result to process. @@ -533,7 +549,7 @@ void ClearAllConnections() } } -void ConnectDatabases(InspIRCd* ServerInstance) +void ConnectDatabases(InspIRCd* ServerInstance, ModuleSQL* Parent) { for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) { @@ -544,15 +560,15 @@ void ConnectDatabases(InspIRCd* ServerInstance) if (!i->second->Connect()) { /* XXX: MUTEX */ - pthread_mutex_lock(&logging_mutex); + Parent->LoggingMutex->Enable(true); ServerInstance->Logs->Log("m_mysql",DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError()); i->second->SetEnable(false); - pthread_mutex_unlock(&logging_mutex); + Parent->LoggingMutex->Enable(false); } } } -void LoadDatabases(ConfigReader* conf, InspIRCd* ServerInstance) +void LoadDatabases(ConfigReader* conf, InspIRCd* ServerInstance, ModuleSQL* Parent) { ClearOldConnections(conf); for (int j =0; j < conf->Enumerate("database"); j++) @@ -571,11 +587,11 @@ void LoadDatabases(ConfigReader* conf, InspIRCd* ServerInstance) if (!host.id.empty() && !host.host.empty() && !host.name.empty() && !host.user.empty() && !host.pass.empty()) { - SQLConnection* ThisSQL = new SQLConnection(host); + SQLConnection* ThisSQL = new SQLConnection(host, Parent); Connections[host.id] = ThisSQL; } } - ConnectDatabases(ServerInstance); + ConnectDatabases(ServerInstance, Parent); } char FindCharId(const std::string &id) @@ -643,15 +659,15 @@ class Notifier : public BufferedSocket { insp_sockaddr sock_us; socklen_t uslen; - + ModuleSQL* Parent; public: /* Create a socket on a random port. Let the tcp stack allocate us an available port */ #ifdef IPV6 - Notifier(InspIRCd* SI) : BufferedSocket(SI, "::1", 0, true, 3000) + Notifier(InspIRCd* SI, ModuleSQL* Creator) : BufferedSocket(SI, "::1", 0, true, 3000), Parent(Creator) #else - Notifier(InspIRCd* SI) : BufferedSocket(SI, "127.0.0.1", 0, true, 3000) + Notifier(InspIRCd* SI, ModuleSQL* Creator) : BufferedSocket(SI, "127.0.0.1", 0, true, 3000), Parent(Creator) #endif { uslen = sizeof(sock_us); @@ -697,12 +713,12 @@ class Notifier : public BufferedSocket if (iter != Connections.end()) { /* Lock the mutex, send back the data */ - pthread_mutex_lock(&results_mutex); + Parent->ResultsMutex->Enable(true); ResultQueue::iterator n = iter->second->rq.begin(); (*n)->Send(); delete (*n); iter->second->rq.pop_front(); - pthread_mutex_unlock(&results_mutex); + Parent->ResultsMutex->Enable(false); return true; } /* No error, but unknown id */ @@ -714,114 +730,98 @@ class Notifier : public BufferedSocket } }; -/** MySQL module - */ -class ModuleSQL : public Module -{ - public: - - ConfigReader *Conf; - InspIRCd* PublicServerInstance; - int currid; - bool rehashing; - DispatcherThread* Dispatcher; - - ModuleSQL(InspIRCd* Me) - : Module::Module(Me), rehashing(false) - { - ServerInstance->Modules->UseInterface("SQLutils"); - - Conf = new ConfigReader(ServerInstance); - PublicServerInstance = ServerInstance; - currid = 0; - SQLModule = this; - MessagePipe = new Notifier(ServerInstance); - - Dispatcher = new DispatcherThread(ServerInstance, this); - ServerInstance->Threads->Create(Dispatcher); +ModuleSQL::ModuleSQL(InspIRCd* Me) : Module::Module(Me), rehashing(false) +{ + ServerInstance->Modules->UseInterface("SQLutils"); - if (!ServerInstance->Modules->PublishFeature("SQL", this)) - { - /* Tell worker thread to exit NOW, - * Automatically joins */ - delete Dispatcher; + Conf = new ConfigReader(ServerInstance); + PublicServerInstance = ServerInstance; + currid = 0; - throw ModuleException("m_mysql: Unable to publish feature 'SQL'"); - } + MessagePipe = new Notifier(ServerInstance, this); - ServerInstance->Modules->PublishInterface("SQL", this); - Implementation eventlist[] = { I_OnRehash, I_OnRequest }; - ServerInstance->Modules->Attach(eventlist, this, 2); - } + Dispatcher = new DispatcherThread(ServerInstance, this); + ServerInstance->Threads->Create(Dispatcher); - virtual ~ModuleSQL() + if (!ServerInstance->Modules->PublishFeature("SQL", this)) { + /* Tell worker thread to exit NOW, + * Automatically joins */ delete Dispatcher; - ClearAllConnections(); - delete Conf; - ServerInstance->Modules->UnpublishInterface("SQL", this); - ServerInstance->Modules->UnpublishFeature("SQL"); - ServerInstance->Modules->DoneWithInterface("SQLutils"); + throw ModuleException("m_mysql: Unable to publish feature 'SQL'"); } + ServerInstance->Modules->PublishInterface("SQL", this); + Implementation eventlist[] = { I_OnRehash, I_OnRequest }; + ServerInstance->Modules->Attach(eventlist, this, 2); +} +ModuleSQL::~ModuleSQL() +{ + delete Dispatcher; + ClearAllConnections(); + delete Conf; + ServerInstance->Modules->UnpublishInterface("SQL", this); + ServerInstance->Modules->UnpublishFeature("SQL"); + ServerInstance->Modules->DoneWithInterface("SQLutils"); +} - unsigned long NewID() - { - if (currid+1 == 0) - currid++; - return ++currid; - } - virtual const char* OnRequest(Request* request) - { - if(strcmp(SQLREQID, request->GetId()) == 0) - { - SQLrequest* req = (SQLrequest*)request; - /* XXX: Lock */ - pthread_mutex_lock(&queue_mutex); +unsigned long ModuleSQL::NewID() +{ + if (currid+1 == 0) + currid++; + return ++currid; +} - ConnMap::iterator iter; +const char* ModuleSQL::OnRequest(Request* request) +{ + if(strcmp(SQLREQID, request->GetId()) == 0) + { + SQLrequest* req = (SQLrequest*)request; - const char* returnval = NULL; + /* XXX: Lock */ + QueueMutex->Enable(true); - if((iter = Connections.find(req->dbid)) != Connections.end()) - { - req->id = NewID(); - iter->second->queue.push(*req); - returnval = SQLSUCCESS; - } - else - { - req->error.Id(BAD_DBID); - } + ConnMap::iterator iter; - pthread_mutex_unlock(&queue_mutex); - /* XXX: Unlock */ + const char* returnval = NULL; - return returnval; + if((iter = Connections.find(req->dbid)) != Connections.end()) + { + req->id = NewID(); + iter->second->queue.push(*req); + returnval = SQLSUCCESS; + } + else + { + req->error.Id(BAD_DBID); } - return NULL; - } + QueueMutex->Enable(false); + /* XXX: Unlock */ - virtual void OnRehash(User* user, const std::string ¶meter) - { - rehashing = true; + return returnval; } - virtual Version GetVersion() - { - return Version("$Id$", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION); - } + return NULL; +} -}; +void ModuleSQL::OnRehash(User* user, const std::string ¶meter) +{ + rehashing = true; +} + +Version ModuleSQL::GetVersion() +{ + return Version("$Id$", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION); +} -void DispatcherThread::Run(void) +void DispatcherThread::Run() { - LoadDatabases(Parent->Conf, Parent->PublicServerInstance); + LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent); /* Connect back to the Notifier */ @@ -856,16 +856,16 @@ void DispatcherThread::Run(void) if (Parent->rehashing) { /* XXX: Lock */ - pthread_mutex_lock(&queue_mutex); + Parent->QueueMutex->Enable(true); Parent->rehashing = false; - LoadDatabases(Parent->Conf, Parent->PublicServerInstance); - pthread_mutex_unlock(&queue_mutex); + LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent); + Parent->QueueMutex->Enable(false); /* XXX: Unlock */ } SQLConnection* conn = NULL; /* XXX: Lock here for safety */ - pthread_mutex_lock(&queue_mutex); + Parent->QueueMutex->Enable(true); for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) { if (i->second->queue.totalsize()) @@ -874,7 +874,7 @@ void DispatcherThread::Run(void) break; } } - pthread_mutex_unlock(&queue_mutex); + Parent->QueueMutex->Enable(false); /* XXX: Unlock */ /* Theres an item! */ @@ -883,9 +883,9 @@ void DispatcherThread::Run(void) conn->DoLeadingQuery(); /* XXX: Lock */ - pthread_mutex_lock(&queue_mutex); + Parent->QueueMutex->Enable(true); conn->queue.pop(); - pthread_mutex_unlock(&queue_mutex); + Parent->QueueMutex->Enable(false); /* XXX: Unlock */ } -- cgit v1.2.3