From 328132ad334490db9e73c7f732c0b618090fa5b0 Mon Sep 17 00:00:00 2001 From: brain Date: Fri, 21 Jul 2006 23:27:47 +0000 Subject: Added notification socket git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@4482 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/modules/extra/m_mysql.cpp | 177 ++++++++++++++++++++++-------------------- 1 file changed, 91 insertions(+), 86 deletions(-) (limited to 'src/modules') diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp index 507702a4a..844e74c30 100644 --- a/src/modules/extra/m_mysql.cpp +++ b/src/modules/extra/m_mysql.cpp @@ -34,16 +34,21 @@ using namespace std; class SQLConnection; +class Notifier; extern InspIRCd* ServerInstance; typedef std::map ConnMap; bool giveup = false; +static Module* SQLModule = NULL; +static Notifier* MessagePipe = NULL; #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224 #define mysql_field_count mysql_num_fields #endif +typedef std::deque ResultQueue; + class QueryQueue : public classbase { private: @@ -157,6 +162,8 @@ private: /* A mutex to wrap around queue accesses */ pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t results_mutex = PTHREAD_MUTEX_INITIALIZER; + class MySQLresult : public SQLresult { int currentrow; @@ -166,6 +173,7 @@ class MySQLresult : public SQLresult SQLfieldMap* fieldmap; int rows; int cols; + public: MySQLresult(Module* self, Module* to, MYSQL_RES* res, int affected_rows) : SQLresult(self, to), currentrow(0), fieldmap(NULL) { @@ -207,6 +215,11 @@ class MySQLresult : public SQLresult log(DEBUG, "Created new MySQL result; %d rows, %d columns", rows, cols); } + MySQLresult(Module* self, Module* to, SQLerror e) : SQLresult(self, to) + { + error = e; + } + ~MySQLresult() { } @@ -310,6 +323,10 @@ class MySQLresult : public SQLresult } }; +class SQLConnection; + +void NotifyMainThread(SQLConnection* connection_with_new_result); + class SQLConnection : public classbase { protected: @@ -323,15 +340,16 @@ class SQLConnection : public classbase std::string db; std::map thisrow; bool Enabled; - long id; + std::string id; public: QueryQueue queue; + ResultQueue rq; // This constructor creates an SQLConnection object with the given credentials, and creates the underlying // MYSQL struct, but does not connect yet. - SQLConnection(std::string thishost, std::string thisuser, std::string thispass, std::string thisdb, long myid) + SQLConnection(std::string thishost, std::string thisuser, std::string thispass, std::string thisdb, const std::string &myid) { this->Enabled = true; this->host = thishost; @@ -367,7 +385,7 @@ class SQLConnection : public classbase unsigned long paramlen; /* Total length of query, used for binary-safety in mysql_real_query */ - unsigned long querylength; + unsigned long querylength = 0; paramlen = 0; @@ -432,83 +450,36 @@ class SQLConnection : public classbase { /* Successfull query */ res = mysql_use_result(&connection); - } - - - } - - // This method issues a query that expects multiple rows of results. Use GetRow() and QueryDone() to retrieve - // multiple rows. - bool QueryResult(std::string query) - { - if (!CheckConnection()) return false; - - int r = mysql_query(&connection, query.c_str()); - if (!r) - { - res = mysql_use_result(&connection); - } - return (!r); - } - - // This method issues a query that just expects a number of 'effected' rows (e.g. UPDATE or DELETE FROM). - // the number of effected rows is returned in the return value. - long QueryCount(std::string query) - { - /* If the connection is down, we return a negative value - New to 1.1 */ - if (!CheckConnection()) return -1; - - int r = mysql_query(&connection, query.c_str()); - if (!r) - { - res = mysql_store_result(&connection); unsigned long rows = mysql_affected_rows(&connection); - mysql_free_result(res); - return rows; + MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), res, rows); + r->dbid = this->GetID(); + r->query = req.query.q; + /* Put this new result onto the results queue. + * XXX: Remember to mutex the queue! + */ + pthread_mutex_lock(&results_mutex); + rq.push_back(r); + pthread_mutex_unlock(&results_mutex); } - return 0; - } - - // This method fetches a row, if available from the database. You must issue a query - // using QueryResult() first! The row's values are returned as a map of std::string - // where each item is keyed by the column name. - std::map GetRow() - { - thisrow.clear(); - if (res) + else { - row = mysql_fetch_row(res); - if (row) - { - unsigned int field_count = 0; - MYSQL_FIELD *fields = mysql_fetch_fields(res); - if(mysql_field_count(&connection) == 0) - return thisrow; - if (fields && mysql_field_count(&connection)) - { - while (field_count < mysql_field_count(&connection)) - { - std::string a = (fields[field_count].name ? fields[field_count].name : ""); - std::string b = (row[field_count] ? row[field_count] : ""); - thisrow[a] = b; - field_count++; - } - return thisrow; - } - } + /* XXX: See /usr/include/mysql/mysqld_error.h for a list of + * possible error numbers and error messages */ + SQLerror e((SQLerrorNum)mysql_errno(&connection), mysql_error(&connection)); + MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), e); + r->dbid = this->GetID(); + r->query = req.query.q; + + pthread_mutex_lock(&results_mutex); + rq.push_back(r); + pthread_mutex_unlock(&results_mutex); } - return thisrow; - } - bool QueryDone() - { - if (res) - { - mysql_free_result(res); - res = NULL; - return true; - } - else return false; + /* Now signal the main thread that we've got a result to process. + * Pass them this connection id as what to examine + */ + + NotifyMainThread(this); } bool ConnectionLost() @@ -532,7 +503,7 @@ class SQLConnection : public classbase return mysql_error(&connection); } - long GetID() + const std::string& GetID() { return id; } @@ -542,14 +513,9 @@ class SQLConnection : public classbase return host; } - void Enable() + void SetEnable(bool Enable) { - Enabled = true; - } - - void Disable() - { - Enabled = false; + Enabled = Enable; } bool IsEnabled() @@ -565,7 +531,7 @@ void ConnectDatabases(Server* Srv) { for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) { - i->second->Enable(); + i->second->SetEnable(true); if (i->second->Connect()) { Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->second->GetHost()); @@ -573,7 +539,7 @@ void ConnectDatabases(Server* Srv) else { Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError()); - i->second->Disable(); + i->second->SetEnable(false); } } } @@ -594,7 +560,7 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv) Srv->Log(DEBUG,"Read database settings"); if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != "")) { - SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,atoi(id.c_str())); + SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,id); Srv->Log(DEFAULT,"Loaded database: "+ThisSQL->GetHost()); Connections[id] = ThisSQL; Srv->Log(DEBUG,"Pushed back connection"); @@ -603,8 +569,42 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv) ConnectDatabases(Srv); } +void NotifyMainThread(SQLConnection* connection_with_new_result) +{ + /* Here we connect() to the socket the main thread has open. + * The main thread is using a nonblocking socket tied into + * the socket engine, so they wont block and they'll receive + * nearly instant notification. Because we're in a seperate + * thread, we can just use standard connect(), and we can + * block if we like. We just send the connection id of the + * connection back. + */ + log(DEBUG,"Notify of result on connection: %s",connection_with_new_result->GetID().c_str()); +} + void* DispatcherThread(void* arg); +class Notifier : public InspSocket +{ + sockaddr_in sock_us; + socklen_t uslen; + + public: + + Notifier() : InspSocket("127.0.0.1", 0, true, 3000) + { + if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) + { + throw ModuleException("Could not create random listening port on localhost"); + } + } + + std::string GetPort() + { + return ConvToStr(ntohs(sock_us.sin_port)); + } +}; + class ModuleSQL : public Module { public: @@ -668,6 +668,11 @@ class ModuleSQL : public Module Srv = Me; Conf = new ConfigReader(); currid = 0; + SQLModule = this; + + MessagePipe = new Notifier(); + log(DEBUG,"Bound notifier to 127.0.0.1:%s",MessagePipe->GetPort().c_str()); + pthread_attr_t attribs; pthread_attr_init(&attribs); pthread_attr_setdetachstate(&attribs, PTHREAD_CREATE_DETACHED); -- cgit v1.2.3