2 * InspIRCd -- Internet Relay Chat Daemon
4 * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5 * Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6 * Copyright (C) 2006-2009 Craig Edwards <craigedwards@brainbox.cc>
7 * Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
9 * This file is part of InspIRCd. InspIRCd is free software: you can
10 * redistribute it and/or modify it under the terms of the GNU General Public
11 * License as published by the Free Software Foundation, version 2.
13 * This program is distributed in the hope that it will be useful, but WITHOUT
14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 /// $CompilerFlags: execute("mysql_config --include" "MYSQL_CXXFLAGS")
23 /// $LinkerFlags: execute("mysql_config --libs_r" "MYSQL_LDFLAGS" "-lmysqlclient")
25 /// $PackageInfo: require_system("arch") mariadb-libs
26 /// $PackageInfo: require_system("centos" "6.0" "6.99") mysql-devel
27 /// $PackageInfo: require_system("centos" "7.0") mariadb-devel
28 /// $PackageInfo: require_system("darwin") mysql-connector-c
29 /// $PackageInfo: require_system("debian") libmysqlclient-dev
30 /// $PackageInfo: require_system("ubuntu") libmysqlclient-dev
33 # pragma GCC diagnostic push
36 // Fix warnings about the use of `long long` on C++03.
38 # pragma clang diagnostic ignored "-Wc++11-long-long"
39 #elif defined __GNUC__
40 # pragma GCC diagnostic ignored "-Wlong-long"
45 #include "modules/sql.h"
48 # pragma GCC diagnostic pop
52 # pragma comment(lib, "libmysql.lib")
55 /* VERSION 3 API: With nonblocking (threaded) requests */
57 /* THE NONBLOCKING MYSQL API!
59 * MySQL provides no nonblocking (asyncronous) API of its own, and its developers recommend
60 * that instead, you should thread your program. This is what i've done here to allow for
61 * asyncronous SQL requests via mysql. The way this works is as follows:
63 * The module spawns a thread via class Thread, and performs its mysql queries in this thread,
64 * using a queue with priorities. There is a mutex on either end which prevents two threads
65 * adjusting the queue at the same time, and crashing the ircd. Every 50 milliseconds, the
66 * worker thread wakes up, and checks if there is a request at the head of its queue.
67 * If there is, it processes this request, blocking the worker thread but leaving the ircd
68 * thread to go about its business as usual. During this period, the ircd thread is able
69 * to insert futher pending requests into the queue.
71 * Once the processing of a request is complete, it is removed from the incoming queue to
72 * an outgoing queue, and initialized as a 'response'. The worker thread then signals the
73 * ircd thread (via a loopback socket) of the fact a result is available, by sending the
74 * connection ID through the connection.
76 * The ircd thread then mutexes the queue once more, reads the outbound response off the head
77 * of the queue, and sends it on its way to the original calling module.
79 * XXX: You might be asking "why doesnt it just send the response from within the worker thread?"
80 * The answer to this is simple. The majority of InspIRCd, and in fact most ircd's are not
81 * threadsafe. This module is designed to be threadsafe and is careful with its use of threads,
82 * however, if we were to call a module's OnRequest even from within a thread which was not the
83 * one the module was originally instantiated upon, there is a chance of all hell breaking loose
84 * if a module is ever put in a re-enterant state (stack corruption could occur, crashes, data
85 * corruption, and worse, so DONT think about it until the day comes when InspIRCd is 100%
86 * gauranteed threadsafe!)
91 class DispatcherThread;
98 QQueueItem(SQL::Query* Q, const std::string& S, SQLConnection* C) : q(Q), query(S), c(C) {}
105 RQueueItem(SQL::Query* Q, MySQLresult* R) : q(Q), r(R) {}
108 typedef insp::flat_map<std::string, SQLConnection*> ConnMap;
109 typedef std::deque<QQueueItem> QueryQueue;
110 typedef std::deque<RQueueItem> ResultQueue;
114 class ModuleSQL : public Module
117 DispatcherThread* Dispatcher;
118 QueryQueue qq; // MUST HOLD MUTEX
119 ResultQueue rq; // MUST HOLD MUTEX
120 ConnMap connections; // main thread only
123 void init() CXX11_OVERRIDE;
125 void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE;
126 void OnUnloadModule(Module* mod) CXX11_OVERRIDE;
127 Version GetVersion() CXX11_OVERRIDE;
130 class DispatcherThread : public SocketThread
133 ModuleSQL* const Parent;
135 DispatcherThread(ModuleSQL* CreatorModule) : Parent(CreatorModule) { }
136 ~DispatcherThread() { }
137 void Run() CXX11_OVERRIDE;
138 void OnNotify() CXX11_OVERRIDE;
141 #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
142 #define mysql_field_count mysql_num_fields
145 /** Represents a mysql result set
147 class MySQLresult : public SQL::Result
153 std::vector<std::string> colnames;
154 std::vector<SQL::Row> fieldlists;
156 MySQLresult(MYSQL_RES* res, int affected_rows) : err(SQL::SUCCESS), currentrow(0), rows(0)
158 if (affected_rows >= 1)
160 rows = affected_rows;
161 fieldlists.resize(rows);
163 unsigned int field_count = 0;
168 while ((row = mysql_fetch_row(res)))
170 if (fieldlists.size() < (unsigned int)rows+1)
172 fieldlists.resize(fieldlists.size()+1);
175 MYSQL_FIELD *fields = mysql_fetch_fields(res);
176 if(mysql_num_fields(res) == 0)
178 if (fields && mysql_num_fields(res))
181 while (field_count < mysql_num_fields(res))
183 std::string a = (fields[field_count].name ? fields[field_count].name : "");
184 if (row[field_count])
185 fieldlists[n].push_back(SQL::Field(row[field_count]));
187 fieldlists[n].push_back(SQL::Field());
188 colnames.push_back(a);
195 mysql_free_result(res);
199 MySQLresult(SQL::Error& e) : err(e)
204 int Rows() CXX11_OVERRIDE
209 void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
211 result.assign(colnames.begin(), colnames.end());
214 bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
216 for (size_t i = 0; i < colnames.size(); ++i)
218 if (colnames[i] == column)
227 SQL::Field GetValue(int row, int column)
229 if ((row >= 0) && (row < rows) && (column >= 0) && (column < (int)fieldlists[row].size()))
231 return fieldlists[row][column];
236 bool GetRow(SQL::Row& result) CXX11_OVERRIDE
238 if (currentrow < rows)
240 result.assign(fieldlists[currentrow].begin(), fieldlists[currentrow].end());
252 /** Represents a connection to a mysql database
254 class SQLConnection : public SQL::Provider
257 bool EscapeString(SQL::Query* query, const std::string& in, std::string& out)
259 // In the worst case each character may need to be encoded as using two bytes and one
260 // byte is the NUL terminator.
261 std::vector<char> buffer(in.length() * 2 + 1);
263 // The return value of mysql_escape_string() is either an error or the length of the
264 // encoded string not including the NUL terminator.
266 // Unfortunately, someone genius decided that mysql_escape_string should return an
267 // unsigned type even though -1 is returned on error so checking whether an error
268 // happened is a bit cursed.
269 unsigned long escapedsize = mysql_escape_string(&buffer[0], in.c_str(), in.length());
270 if (escapedsize == static_cast<unsigned long>(-1))
272 SQL::Error err(SQL::QSEND_FAIL, InspIRCd::Format("%u: %s", mysql_errno(connection), mysql_error(connection)));
277 out.append(&buffer[0], escapedsize);
282 reference<ConfigTag> config;
286 // This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
287 SQLConnection(Module* p, ConfigTag* tag) : SQL::Provider(p, "SQL/" + tag->getString("id")),
288 config(tag), connection(NULL)
297 // This method connects to the database using the credentials supplied to the constructor, and returns
298 // true upon success.
301 unsigned int timeout = 1;
302 connection = mysql_init(connection);
303 mysql_options(connection,MYSQL_OPT_CONNECT_TIMEOUT,(char*)&timeout);
304 std::string host = config->getString("host");
305 std::string user = config->getString("user");
306 std::string pass = config->getString("pass");
307 std::string dbname = config->getString("name");
308 unsigned int port = config->getUInt("port", 3306);
309 bool rv = mysql_real_connect(connection, host.c_str(), user.c_str(), pass.c_str(), dbname.c_str(), port, NULL, 0);
313 // Enable character set settings
314 std::string charset = config->getString("charset");
315 if ((!charset.empty()) && (mysql_set_character_set(connection, charset.c_str())))
316 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not set character set to \"%s\"", charset.c_str());
318 std::string initquery;
319 if (config->readString("initialquery", initquery))
321 mysql_query(connection,initquery.c_str());
328 return (ModuleSQL*)(Module*)creator;
331 MySQLresult* DoBlockingQuery(const std::string& query)
334 /* Parse the command string and dispatch it to mysql */
335 if (CheckConnection() && !mysql_real_query(connection, query.data(), query.length()))
337 /* Successfull query */
338 MYSQL_RES* res = mysql_use_result(connection);
339 unsigned long rows = mysql_affected_rows(connection);
340 return new MySQLresult(res, rows);
344 /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
345 * possible error numbers and error messages */
346 SQL::Error e(SQL::QREPLY_FAIL, InspIRCd::Format("%u: %s", mysql_errno(connection), mysql_error(connection)));
347 return new MySQLresult(e);
351 bool CheckConnection()
353 if (!connection || mysql_ping(connection) != 0)
358 std::string GetError()
360 return mysql_error(connection);
365 mysql_close(connection);
368 void Submit(SQL::Query* q, const std::string& qs) CXX11_OVERRIDE
370 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing MySQL query: " + qs);
371 Parent()->Dispatcher->LockQueue();
372 Parent()->qq.push_back(QQueueItem(q, qs, this));
373 Parent()->Dispatcher->UnlockQueueWakeup();
376 void Submit(SQL::Query* call, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
379 unsigned int param = 0;
380 for(std::string::size_type i = 0; i < q.length(); i++)
384 else if (param < p.size() && !EscapeString(call, p[param++], res))
390 void Submit(SQL::Query* call, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
393 for(std::string::size_type i = 0; i < q.length(); i++)
401 while (i < q.length() && isalnum(q[i]))
402 field.push_back(q[i++]);
405 SQL::ParamMap::const_iterator it = p.find(field);
406 if (it != p.end() && !EscapeString(call, it->second, res))
414 ModuleSQL::ModuleSQL()
419 void ModuleSQL::init()
421 if (mysql_library_init(0, NULL, NULL))
422 throw ModuleException("Unable to initialise the MySQL library!");
424 Dispatcher = new DispatcherThread(this);
425 ServerInstance->Threads.Start(Dispatcher);
428 ModuleSQL::~ModuleSQL()
433 Dispatcher->OnNotify();
437 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
445 void ModuleSQL::ReadConfig(ConfigStatus& status)
448 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
449 for(ConfigIter i = tags.first; i != tags.second; i++)
451 if (!stdalgo::string::equalsci(i->second->getString("module"), "mysql"))
453 std::string id = i->second->getString("id");
454 ConnMap::iterator curr = connections.find(id);
455 if (curr == connections.end())
457 SQLConnection* conn = new SQLConnection(this, i->second);
458 conns.insert(std::make_pair(id, conn));
459 ServerInstance->Modules->AddService(*conn);
464 connections.erase(curr);
468 // now clean up the deleted databases
469 Dispatcher->LockQueue();
470 SQL::Error err(SQL::BAD_DBID);
471 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
473 ServerInstance->Modules->DelService(*i->second);
474 // it might be running a query on this database. Wait for that to complete
475 i->second->lock.Lock();
476 i->second->lock.Unlock();
477 // now remove all active queries to this DB
478 for (size_t j = qq.size(); j > 0; j--)
481 if (qq[k].c == i->second)
483 qq[k].q->OnError(err);
485 qq.erase(qq.begin() + k);
488 // finally, nuke the connection
491 Dispatcher->UnlockQueue();
492 connections.swap(conns);
495 void ModuleSQL::OnUnloadModule(Module* mod)
497 SQL::Error err(SQL::BAD_DBID);
498 Dispatcher->LockQueue();
499 unsigned int i = qq.size();
503 if (qq[i].q->creator == mod)
507 // need to wait until the query is done
508 // (the result will be discarded)
509 qq[i].c->lock.Lock();
510 qq[i].c->lock.Unlock();
512 qq[i].q->OnError(err);
514 qq.erase(qq.begin() + i);
517 Dispatcher->UnlockQueue();
518 // clean up any result queue entries
519 Dispatcher->OnNotify();
522 Version ModuleSQL::GetVersion()
524 return Version("Provides MySQL support", VF_VENDOR);
527 void DispatcherThread::Run()
530 while (!this->GetExitFlag())
532 if (!Parent->qq.empty())
534 QQueueItem i = Parent->qq.front();
537 MySQLresult* res = i.c->DoBlockingQuery(i.query);
541 * At this point, the main thread could be working on:
542 * Rehash - delete i.c out from under us. We don't care about that.
543 * UnloadModule - delete i.q and the qq item. Need to avoid reporting results.
547 if (!Parent->qq.empty() && Parent->qq.front().q == i.q)
549 Parent->qq.pop_front();
550 Parent->rq.push_back(RQueueItem(i.q, res));
555 // UnloadModule ate the query
561 /* We know the queue is empty, we can safely hang this thread until
564 this->WaitForQueue();
570 void DispatcherThread::OnNotify()
572 // this could unlock during the dispatch, but OnResult isn't expected to take that long
574 for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
576 MySQLresult* res = i->r;
577 if (res->err.code == SQL::SUCCESS)
578 i->q->OnResult(*res);
580 i->q->OnError(res->err);
588 MODULE_INIT(ModuleSQL)