2 * InspIRCd -- Internet Relay Chat Daemon
4 * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5 * Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6 * Copyright (C) 2006-2009 Craig Edwards <craigedwards@brainbox.cc>
7 * Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
9 * This file is part of InspIRCd. InspIRCd is free software: you can
10 * redistribute it and/or modify it under the terms of the GNU General Public
11 * License as published by the Free Software Foundation, version 2.
13 * This program is distributed in the hope that it will be useful, but WITHOUT
14 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /* Stop mysql wanting to use long long */
24 #define NO_CLIENT_LONG_LONG
28 #include "modules/sql.h"
31 # pragma comment(lib, "mysqlclient.lib")
32 # pragma comment(lib, "advapi32.lib")
33 # pragma comment(linker, "/NODEFAULTLIB:LIBCMT")
36 /* VERSION 3 API: With nonblocking (threaded) requests */
38 /* $ModDesc: SQL Service Provider module for all other m_sql* modules */
39 /* $CompileFlags: exec("mysql_config --include") */
40 /* $LinkerFlags: exec("mysql_config --libs_r") rpath("mysql_config --libs_r") */
42 /* THE NONBLOCKING MYSQL API!
44 * MySQL provides no nonblocking (asyncronous) API of its own, and its developers recommend
45 * that instead, you should thread your program. This is what i've done here to allow for
46 * asyncronous SQL requests via mysql. The way this works is as follows:
48 * The module spawns a thread via class Thread, and performs its mysql queries in this thread,
49 * using a queue with priorities. There is a mutex on either end which prevents two threads
50 * adjusting the queue at the same time, and crashing the ircd. Every 50 milliseconds, the
51 * worker thread wakes up, and checks if there is a request at the head of its queue.
52 * If there is, it processes this request, blocking the worker thread but leaving the ircd
53 * thread to go about its business as usual. During this period, the ircd thread is able
54 * to insert futher pending requests into the queue.
56 * Once the processing of a request is complete, it is removed from the incoming queue to
57 * an outgoing queue, and initialized as a 'response'. The worker thread then signals the
58 * ircd thread (via a loopback socket) of the fact a result is available, by sending the
59 * connection ID through the connection.
61 * The ircd thread then mutexes the queue once more, reads the outbound response off the head
62 * of the queue, and sends it on its way to the original calling module.
64 * XXX: You might be asking "why doesnt he just send the response from within the worker thread?"
65 * The answer to this is simple. The majority of InspIRCd, and in fact most ircd's are not
66 * threadsafe. This module is designed to be threadsafe and is careful with its use of threads,
67 * however, if we were to call a module's OnRequest even from within a thread which was not the
68 * one the module was originally instantiated upon, there is a chance of all hell breaking loose
69 * if a module is ever put in a re-enterant state (stack corruption could occur, crashes, data
70 * corruption, and worse, so DONT think about it until the day comes when InspIRCd is 100%
71 * gauranteed threadsafe!)
73 * For a diagram of this system please see http://wiki.inspircd.org/Mysql2
78 class DispatcherThread;
85 QQueueItem(SQLQuery* Q, const std::string& S, SQLConnection* C) : q(Q), query(S), c(C) {}
92 RQueueItem(SQLQuery* Q, MySQLresult* R) : q(Q), r(R) {}
95 typedef std::map<std::string, SQLConnection*> ConnMap;
96 typedef std::deque<QQueueItem> QueryQueue;
97 typedef std::deque<RQueueItem> ResultQueue;
101 class ModuleSQL : public Module
104 DispatcherThread* Dispatcher;
105 QueryQueue qq; // MUST HOLD MUTEX
106 ResultQueue rq; // MUST HOLD MUTEX
107 ConnMap connections; // main thread only
112 void OnRehash(User* user);
113 void OnUnloadModule(Module* mod);
114 Version GetVersion();
117 class DispatcherThread : public SocketThread
120 ModuleSQL* const Parent;
122 DispatcherThread(ModuleSQL* CreatorModule) : Parent(CreatorModule) { }
123 ~DispatcherThread() { }
125 virtual void OnNotify();
128 #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
129 #define mysql_field_count mysql_num_fields
132 /** Represents a mysql result set
134 class MySQLresult : public SQLResult
140 std::vector<std::string> colnames;
141 std::vector<SQLEntries> fieldlists;
143 MySQLresult(MYSQL_RES* res, int affected_rows) : err(SQL_NO_ERROR), currentrow(0), rows(0)
145 if (affected_rows >= 1)
147 rows = affected_rows;
148 fieldlists.resize(rows);
150 unsigned int field_count = 0;
155 while ((row = mysql_fetch_row(res)))
157 if (fieldlists.size() < (unsigned int)rows+1)
159 fieldlists.resize(fieldlists.size()+1);
162 MYSQL_FIELD *fields = mysql_fetch_fields(res);
163 if(mysql_num_fields(res) == 0)
165 if (fields && mysql_num_fields(res))
168 while (field_count < mysql_num_fields(res))
170 std::string a = (fields[field_count].name ? fields[field_count].name : "");
171 if (row[field_count])
172 fieldlists[n].push_back(SQLEntry(row[field_count]));
174 fieldlists[n].push_back(SQLEntry());
175 colnames.push_back(a);
182 mysql_free_result(res);
187 MySQLresult(SQLerror& e) : err(e)
197 virtual void GetCols(std::vector<std::string>& result)
199 result.assign(colnames.begin(), colnames.end());
202 virtual SQLEntry GetValue(int row, int column)
204 if ((row >= 0) && (row < rows) && (column >= 0) && (column < (int)fieldlists[row].size()))
206 return fieldlists[row][column];
211 virtual bool GetRow(SQLEntries& result)
213 if (currentrow < rows)
215 result.assign(fieldlists[currentrow].begin(), fieldlists[currentrow].end());
227 /** Represents a connection to a mysql database
229 class SQLConnection : public SQLProvider
232 reference<ConfigTag> config;
236 // This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
237 SQLConnection(Module* p, ConfigTag* tag) : SQLProvider(p, "SQL/" + tag->getString("id")),
238 config(tag), connection(NULL)
247 // This method connects to the database using the credentials supplied to the constructor, and returns
248 // true upon success.
251 unsigned int timeout = 1;
252 connection = mysql_init(connection);
253 mysql_options(connection,MYSQL_OPT_CONNECT_TIMEOUT,(char*)&timeout);
254 std::string host = config->getString("host");
255 std::string user = config->getString("user");
256 std::string pass = config->getString("pass");
257 std::string dbname = config->getString("name");
258 int port = config->getInt("port");
259 bool rv = mysql_real_connect(connection, host.c_str(), user.c_str(), pass.c_str(), dbname.c_str(), port, NULL, 0);
262 std::string initquery;
263 if (config->readString("initialquery", initquery))
265 mysql_query(connection,initquery.c_str());
272 return (ModuleSQL*)(Module*)creator;
275 MySQLresult* DoBlockingQuery(const std::string& query)
278 /* Parse the command string and dispatch it to mysql */
279 if (CheckConnection() && !mysql_real_query(connection, query.data(), query.length()))
281 /* Successfull query */
282 MYSQL_RES* res = mysql_use_result(connection);
283 unsigned long rows = mysql_affected_rows(connection);
284 return new MySQLresult(res, rows);
288 /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
289 * possible error numbers and error messages */
290 SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + ": " + mysql_error(connection));
291 return new MySQLresult(e);
295 bool CheckConnection()
297 if (!connection || mysql_ping(connection) != 0)
302 std::string GetError()
304 return mysql_error(connection);
309 mysql_close(connection);
312 void submit(SQLQuery* q, const std::string& qs)
314 Parent()->Dispatcher->LockQueue();
315 Parent()->qq.push_back(QQueueItem(q, qs, this));
316 Parent()->Dispatcher->UnlockQueueWakeup();
319 void submit(SQLQuery* call, const std::string& q, const ParamL& p)
322 unsigned int param = 0;
323 for(std::string::size_type i = 0; i < q.length(); i++)
329 if (param < p.size())
331 std::string parm = p[param++];
333 mysql_escape_string(buffer, parm.c_str(), parm.length());
334 // mysql_real_escape_string(connection, queryend, paramscopy[paramnum].c_str(), paramscopy[paramnum].length());
342 void submit(SQLQuery* call, const std::string& q, const ParamM& p)
345 for(std::string::size_type i = 0; i < q.length(); i++)
353 while (i < q.length() && isalnum(q[i]))
354 field.push_back(q[i++]);
357 ParamM::const_iterator it = p.find(field);
360 std::string parm = it->second;
362 mysql_escape_string(buffer, parm.c_str(), parm.length());
371 ModuleSQL::ModuleSQL()
376 void ModuleSQL::init()
378 Dispatcher = new DispatcherThread(this);
379 ServerInstance->Threads->Start(Dispatcher);
381 Implementation eventlist[] = { I_OnRehash, I_OnUnloadModule };
382 ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation));
387 ModuleSQL::~ModuleSQL()
392 Dispatcher->OnNotify();
395 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
401 void ModuleSQL::OnRehash(User* user)
404 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
405 for(ConfigIter i = tags.first; i != tags.second; i++)
407 if (i->second->getString("module", "mysql") != "mysql")
409 std::string id = i->second->getString("id");
410 ConnMap::iterator curr = connections.find(id);
411 if (curr == connections.end())
413 SQLConnection* conn = new SQLConnection(this, i->second);
414 conns.insert(std::make_pair(id, conn));
415 ServerInstance->Modules->AddService(*conn);
420 connections.erase(curr);
424 // now clean up the deleted databases
425 Dispatcher->LockQueue();
426 SQLerror err(SQL_BAD_DBID);
427 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
429 ServerInstance->Modules->DelService(*i->second);
430 // it might be running a query on this database. Wait for that to complete
431 i->second->lock.Lock();
432 i->second->lock.Unlock();
433 // now remove all active queries to this DB
434 for(unsigned int j = qq.size() - 1; j >= 0; j--)
436 if (qq[j].c == i->second)
438 qq[j].q->OnError(err);
440 qq.erase(qq.begin() + j);
443 // finally, nuke the connection
446 Dispatcher->UnlockQueue();
447 connections.swap(conns);
450 void ModuleSQL::OnUnloadModule(Module* mod)
452 SQLerror err(SQL_BAD_DBID);
453 Dispatcher->LockQueue();
454 unsigned int i = qq.size();
458 if (qq[i].q->creator == mod)
462 // need to wait until the query is done
463 // (the result will be discarded)
464 qq[i].c->lock.Lock();
465 qq[i].c->lock.Unlock();
467 qq[i].q->OnError(err);
469 qq.erase(qq.begin() + i);
472 Dispatcher->UnlockQueue();
473 // clean up any result queue entries
474 Dispatcher->OnNotify();
477 Version ModuleSQL::GetVersion()
479 return Version("MySQL support", VF_VENDOR);
482 void DispatcherThread::Run()
485 while (!this->GetExitFlag())
487 if (!Parent->qq.empty())
489 QQueueItem i = Parent->qq.front();
492 MySQLresult* res = i.c->DoBlockingQuery(i.query);
496 * At this point, the main thread could be working on:
497 * Rehash - delete i.c out from under us. We don't care about that.
498 * UnloadModule - delete i.q and the qq item. Need to avoid reporting results.
502 if (!Parent->qq.empty() && Parent->qq.front().q == i.q)
504 Parent->qq.pop_front();
505 Parent->rq.push_back(RQueueItem(i.q, res));
510 // UnloadModule ate the query
516 /* We know the queue is empty, we can safely hang this thread until
519 this->WaitForQueue();
525 void DispatcherThread::OnNotify()
527 // this could unlock during the dispatch, but OnResult isn't expected to take that long
529 for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
531 MySQLresult* res = i->r;
532 if (res->err.id == SQL_NO_ERROR)
533 i->q->OnResult(*res);
535 i->q->OnError(res->err);
543 MODULE_INIT(ModuleSQL)