]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_mysql.cpp
Allow Channel::WriteNotice send to other servers and status ranks.
[user/henk/code/inspircd.git] / src / modules / extra / m_mysql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *
9  * This file is part of InspIRCd.  InspIRCd is free software: you can
10  * redistribute it and/or modify it under the terms of the GNU General Public
11  * License as published by the Free Software Foundation, version 2.
12  *
13  * This program is distributed in the hope that it will be useful, but WITHOUT
14  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
16  * details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  */
21
22 /// $CompilerFlags: execute("mysql_config --include" "MYSQL_CXXFLAGS")
23 /// $LinkerFlags: execute("mysql_config --libs_r" "MYSQL_LDFLAGS" "-lmysqlclient")
24
25 /// $PackageInfo: require_system("arch") mariadb-libs
26 /// $PackageInfo: require_system("centos" "6.0" "6.99") mysql-devel
27 /// $PackageInfo: require_system("centos" "7.0") mariadb-devel
28 /// $PackageInfo: require_system("darwin") mysql-connector-c
29 /// $PackageInfo: require_system("debian") libmysqlclient-dev
30 /// $PackageInfo: require_system("ubuntu") libmysqlclient-dev
31
32 #ifdef __GNUC__
33 # pragma GCC diagnostic push
34 #endif
35
36 // Fix warnings about the use of `long long` on C++03.
37 #if defined __clang__
38 # pragma clang diagnostic ignored "-Wc++11-long-long"
39 #elif defined __GNUC__
40 # pragma GCC diagnostic ignored "-Wlong-long"
41 #endif
42
43 #include "inspircd.h"
44 #include <mysql.h>
45 #include "modules/sql.h"
46
47 #ifdef __GNUC__
48 # pragma GCC diagnostic pop
49 #endif
50
51 #ifdef _WIN32
52 # pragma comment(lib, "libmysql.lib")
53 #endif
54
55 /* VERSION 3 API: With nonblocking (threaded) requests */
56
57 /* THE NONBLOCKING MYSQL API!
58  *
59  * MySQL provides no nonblocking (asyncronous) API of its own, and its developers recommend
60  * that instead, you should thread your program. This is what i've done here to allow for
61  * asyncronous SQL requests via mysql. The way this works is as follows:
62  *
63  * The module spawns a thread via class Thread, and performs its mysql queries in this thread,
64  * using a queue with priorities. There is a mutex on either end which prevents two threads
65  * adjusting the queue at the same time, and crashing the ircd. Every 50 milliseconds, the
66  * worker thread wakes up, and checks if there is a request at the head of its queue.
67  * If there is, it processes this request, blocking the worker thread but leaving the ircd
68  * thread to go about its business as usual. During this period, the ircd thread is able
69  * to insert futher pending requests into the queue.
70  *
71  * Once the processing of a request is complete, it is removed from the incoming queue to
72  * an outgoing queue, and initialized as a 'response'. The worker thread then signals the
73  * ircd thread (via a loopback socket) of the fact a result is available, by sending the
74  * connection ID through the connection.
75  *
76  * The ircd thread then mutexes the queue once more, reads the outbound response off the head
77  * of the queue, and sends it on its way to the original calling module.
78  *
79  * XXX: You might be asking "why doesnt it just send the response from within the worker thread?"
80  * The answer to this is simple. The majority of InspIRCd, and in fact most ircd's are not
81  * threadsafe. This module is designed to be threadsafe and is careful with its use of threads,
82  * however, if we were to call a module's OnRequest even from within a thread which was not the
83  * one the module was originally instantiated upon, there is a chance of all hell breaking loose
84  * if a module is ever put in a re-enterant state (stack corruption could occur, crashes, data
85  * corruption, and worse, so DONT think about it until the day comes when InspIRCd is 100%
86  * gauranteed threadsafe!)
87  */
88
89 class SQLConnection;
90 class MySQLresult;
91 class DispatcherThread;
92
93 struct QueryQueueItem
94 {
95         // An SQL database which this query is executed on.
96         SQLConnection* connection;
97
98         // An object which handles the result of the query.
99         SQL::Query* query;
100
101         // The SQL query which is to be executed.
102         std::string querystr;
103
104         QueryQueueItem(SQL::Query* q, const std::string& s, SQLConnection* c)
105                 : connection(c)
106                 , query(q)
107                 , querystr(s)
108         {
109         }
110 };
111
112 struct ResultQueueItem
113 {
114         // An object which handles the result of the query.
115         SQL::Query* query;
116
117         // The result returned from executing the MySQL query.
118         MySQLresult* result;
119
120         ResultQueueItem(SQL::Query* q, MySQLresult* r)
121                 : query(q)
122                 , result(r)
123         {
124         }
125 };
126
127 typedef insp::flat_map<std::string, SQLConnection*> ConnMap;
128 typedef std::deque<QueryQueueItem> QueryQueue;
129 typedef std::deque<ResultQueueItem> ResultQueue;
130
131 /** MySQL module
132  *  */
133 class ModuleSQL : public Module
134 {
135  public:
136         DispatcherThread* Dispatcher;
137         QueryQueue qq;       // MUST HOLD MUTEX
138         ResultQueue rq;      // MUST HOLD MUTEX
139         ConnMap connections; // main thread only
140
141         ModuleSQL();
142         void init() CXX11_OVERRIDE;
143         ~ModuleSQL();
144         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE;
145         void OnUnloadModule(Module* mod) CXX11_OVERRIDE;
146         Version GetVersion() CXX11_OVERRIDE;
147 };
148
149 class DispatcherThread : public SocketThread
150 {
151  private:
152         ModuleSQL* const Parent;
153  public:
154         DispatcherThread(ModuleSQL* CreatorModule) : Parent(CreatorModule) { }
155         ~DispatcherThread() { }
156         void Run() CXX11_OVERRIDE;
157         void OnNotify() CXX11_OVERRIDE;
158 };
159
160 /** Represents a mysql result set
161  */
162 class MySQLresult : public SQL::Result
163 {
164  public:
165         SQL::Error err;
166         int currentrow;
167         int rows;
168         std::vector<std::string> colnames;
169         std::vector<SQL::Row> fieldlists;
170
171         MySQLresult(MYSQL_RES* res, int affected_rows)
172                 : err(SQL::SUCCESS)
173                 , currentrow(0)
174                 , rows(0)
175         {
176                 if (affected_rows >= 1)
177                 {
178                         rows = affected_rows;
179                         fieldlists.resize(rows);
180                 }
181                 unsigned int field_count = 0;
182                 if (res)
183                 {
184                         MYSQL_ROW row;
185                         int n = 0;
186                         while ((row = mysql_fetch_row(res)))
187                         {
188                                 if (fieldlists.size() < (unsigned int)rows+1)
189                                 {
190                                         fieldlists.resize(fieldlists.size()+1);
191                                 }
192                                 field_count = 0;
193                                 MYSQL_FIELD *fields = mysql_fetch_fields(res);
194                                 if(mysql_num_fields(res) == 0)
195                                         break;
196                                 if (fields && mysql_num_fields(res))
197                                 {
198                                         colnames.clear();
199                                         while (field_count < mysql_num_fields(res))
200                                         {
201                                                 std::string a = (fields[field_count].name ? fields[field_count].name : "");
202                                                 if (row[field_count])
203                                                         fieldlists[n].push_back(SQL::Field(row[field_count]));
204                                                 else
205                                                         fieldlists[n].push_back(SQL::Field());
206                                                 colnames.push_back(a);
207                                                 field_count++;
208                                         }
209                                         n++;
210                                 }
211                                 rows++;
212                         }
213                         mysql_free_result(res);
214                 }
215         }
216
217         MySQLresult(SQL::Error& e)
218                 : err(e)
219                 , currentrow(0)
220                 , rows(0)
221         {
222
223         }
224
225         int Rows() CXX11_OVERRIDE
226         {
227                 return rows;
228         }
229
230         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
231         {
232                 result.assign(colnames.begin(), colnames.end());
233         }
234
235         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
236         {
237                 for (size_t i = 0; i < colnames.size(); ++i)
238                 {
239                         if (colnames[i] == column)
240                         {
241                                 index = i;
242                                 return true;
243                         }
244                 }
245                 return false;
246         }
247
248         SQL::Field GetValue(int row, int column)
249         {
250                 if ((row >= 0) && (row < rows) && (column >= 0) && (column < (int)fieldlists[row].size()))
251                 {
252                         return fieldlists[row][column];
253                 }
254                 return SQL::Field();
255         }
256
257         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
258         {
259                 if (currentrow < rows)
260                 {
261                         result.assign(fieldlists[currentrow].begin(), fieldlists[currentrow].end());
262                         currentrow++;
263                         return true;
264                 }
265                 else
266                 {
267                         result.clear();
268                         return false;
269                 }
270         }
271 };
272
273 /** Represents a connection to a mysql database
274  */
275 class SQLConnection : public SQL::Provider
276 {
277  private:
278         bool EscapeString(SQL::Query* query, const std::string& in, std::string& out)
279         {
280                 // In the worst case each character may need to be encoded as using two bytes and one
281                 // byte is the NUL terminator.
282                 std::vector<char> buffer(in.length() * 2 + 1);
283
284                 // The return value of mysql_escape_string() is either an error or the length of the
285                 // encoded string not including the NUL terminator.
286                 //
287                 // Unfortunately, someone genius decided that mysql_escape_string should return an
288                 // unsigned type even though -1 is returned on error so checking whether an error
289                 // happened is a bit cursed.
290                 unsigned long escapedsize = mysql_escape_string(&buffer[0], in.c_str(), in.length());
291                 if (escapedsize == static_cast<unsigned long>(-1))
292                 {
293                         SQL::Error err(SQL::QSEND_FAIL, InspIRCd::Format("%u: %s", mysql_errno(connection), mysql_error(connection)));
294                         query->OnError(err);
295                         return false;
296                 }
297
298                 out.append(&buffer[0], escapedsize);
299                 return true;
300         }
301
302  public:
303         reference<ConfigTag> config;
304         MYSQL *connection;
305         Mutex lock;
306
307         // This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
308         SQLConnection(Module* p, ConfigTag* tag)
309                 : SQL::Provider(p, tag->getString("id"))
310                 , config(tag)
311                 , connection(NULL)
312         {
313         }
314
315         ~SQLConnection()
316         {
317                 mysql_close(connection);
318         }
319
320         // This method connects to the database using the credentials supplied to the constructor, and returns
321         // true upon success.
322         bool Connect()
323         {
324                 connection = mysql_init(connection);
325
326                 // Set the connection timeout.
327                 unsigned int timeout = config->getDuration("timeout", 5, 1, 30);
328                 mysql_options(connection, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
329
330                 // Attempt to connect to the database.
331                 const std::string host = config->getString("host");
332                 const std::string user = config->getString("user");
333                 const std::string pass = config->getString("pass");
334                 const std::string dbname = config->getString("name");
335                 unsigned int port = config->getUInt("port", 3306, 1, 65535);
336                 if (!mysql_real_connect(connection, host.c_str(), user.c_str(), pass.c_str(), dbname.c_str(), port, NULL, CLIENT_IGNORE_SIGPIPE))
337                 {
338                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Unable to connect to the %s MySQL server: %s",
339                                 GetId().c_str(), mysql_error(connection));
340                         return false;
341                 }
342
343                 // Set the default character set.
344                 const std::string charset = config->getString("charset");
345                 if (!charset.empty() && mysql_set_character_set(connection, charset.c_str()))
346                 {
347                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Could not set character set for %s to \"%s\": %s",
348                                 GetId().c_str(), charset.c_str(), mysql_error(connection));
349                         return false;
350                 }
351
352                 // Execute the initial SQL query.
353                 const std::string initialquery = config->getString("initialquery");
354                 if (!initialquery.empty() && mysql_real_query(connection, initialquery.data(), initialquery.length()))
355                 {
356                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Could not execute initial query \"%s\" for %s: %s",
357                                 initialquery.c_str(), name.c_str(), mysql_error(connection));
358                         return false;
359                 }
360
361                 return true;
362         }
363
364         ModuleSQL* Parent()
365         {
366                 return (ModuleSQL*)(Module*)creator;
367         }
368
369         MySQLresult* DoBlockingQuery(const std::string& query)
370         {
371
372                 /* Parse the command string and dispatch it to mysql */
373                 if (CheckConnection() && !mysql_real_query(connection, query.data(), query.length()))
374                 {
375                         /* Successfull query */
376                         MYSQL_RES* res = mysql_use_result(connection);
377                         unsigned long rows = mysql_affected_rows(connection);
378                         return new MySQLresult(res, rows);
379                 }
380                 else
381                 {
382                         /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
383                          * possible error numbers and error messages */
384                         SQL::Error e(SQL::QREPLY_FAIL, InspIRCd::Format("%u: %s", mysql_errno(connection), mysql_error(connection)));
385                         return new MySQLresult(e);
386                 }
387         }
388
389         bool CheckConnection()
390         {
391                 if (!connection || mysql_ping(connection) != 0)
392                         return Connect();
393                 return true;
394         }
395
396         void Submit(SQL::Query* q, const std::string& qs) CXX11_OVERRIDE
397         {
398                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing MySQL query: " + qs);
399                 Parent()->Dispatcher->LockQueue();
400                 Parent()->qq.push_back(QueryQueueItem(q, qs, this));
401                 Parent()->Dispatcher->UnlockQueueWakeup();
402         }
403
404         void Submit(SQL::Query* call, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
405         {
406                 std::string res;
407                 unsigned int param = 0;
408                 for(std::string::size_type i = 0; i < q.length(); i++)
409                 {
410                         if (q[i] != '?')
411                                 res.push_back(q[i]);
412                         else if (param < p.size() && !EscapeString(call, p[param++], res))
413                                 return;
414                 }
415                 Submit(call, res);
416         }
417
418         void Submit(SQL::Query* call, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
419         {
420                 std::string res;
421                 for(std::string::size_type i = 0; i < q.length(); i++)
422                 {
423                         if (q[i] != '$')
424                                 res.push_back(q[i]);
425                         else
426                         {
427                                 std::string field;
428                                 i++;
429                                 while (i < q.length() && isalnum(q[i]))
430                                         field.push_back(q[i++]);
431                                 i--;
432
433                                 SQL::ParamMap::const_iterator it = p.find(field);
434                                 if (it != p.end() && !EscapeString(call, it->second, res))
435                                         return;
436                         }
437                 }
438                 Submit(call, res);
439         }
440 };
441
442 ModuleSQL::ModuleSQL()
443         : Dispatcher(NULL)
444 {
445 }
446
447 void ModuleSQL::init()
448 {
449         if (mysql_library_init(0, NULL, NULL))
450                 throw ModuleException("Unable to initialise the MySQL library!");
451
452         Dispatcher = new DispatcherThread(this);
453         ServerInstance->Threads.Start(Dispatcher);
454 }
455
456 ModuleSQL::~ModuleSQL()
457 {
458         if (Dispatcher)
459         {
460                 Dispatcher->join();
461                 Dispatcher->OnNotify();
462                 delete Dispatcher;
463         }
464
465         for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
466         {
467                 delete i->second;
468         }
469
470         mysql_library_end();
471 }
472
473 void ModuleSQL::ReadConfig(ConfigStatus& status)
474 {
475         ConnMap conns;
476         ConfigTagList tags = ServerInstance->Config->ConfTags("database");
477         for(ConfigIter i = tags.first; i != tags.second; i++)
478         {
479                 if (!stdalgo::string::equalsci(i->second->getString("module"), "mysql"))
480                         continue;
481                 std::string id = i->second->getString("id");
482                 ConnMap::iterator curr = connections.find(id);
483                 if (curr == connections.end())
484                 {
485                         SQLConnection* conn = new SQLConnection(this, i->second);
486                         conns.insert(std::make_pair(id, conn));
487                         ServerInstance->Modules->AddService(*conn);
488                 }
489                 else
490                 {
491                         conns.insert(*curr);
492                         connections.erase(curr);
493                 }
494         }
495
496         // now clean up the deleted databases
497         Dispatcher->LockQueue();
498         SQL::Error err(SQL::BAD_DBID);
499         for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
500         {
501                 ServerInstance->Modules->DelService(*i->second);
502                 // it might be running a query on this database. Wait for that to complete
503                 i->second->lock.Lock();
504                 i->second->lock.Unlock();
505                 // now remove all active queries to this DB
506                 for (size_t j = qq.size(); j > 0; j--)
507                 {
508                         size_t k = j - 1;
509                         if (qq[k].connection == i->second)
510                         {
511                                 qq[k].query->OnError(err);
512                                 delete qq[k].query;
513                                 qq.erase(qq.begin() + k);
514                         }
515                 }
516                 // finally, nuke the connection
517                 delete i->second;
518         }
519         Dispatcher->UnlockQueue();
520         connections.swap(conns);
521 }
522
523 void ModuleSQL::OnUnloadModule(Module* mod)
524 {
525         SQL::Error err(SQL::BAD_DBID);
526         Dispatcher->LockQueue();
527         unsigned int i = qq.size();
528         while (i > 0)
529         {
530                 i--;
531                 if (qq[i].query->creator == mod)
532                 {
533                         if (i == 0)
534                         {
535                                 // need to wait until the query is done
536                                 // (the result will be discarded)
537                                 qq[i].connection->lock.Lock();
538                                 qq[i].connection->lock.Unlock();
539                         }
540                         qq[i].query->OnError(err);
541                         delete qq[i].query;
542                         qq.erase(qq.begin() + i);
543                 }
544         }
545         Dispatcher->UnlockQueue();
546         // clean up any result queue entries
547         Dispatcher->OnNotify();
548 }
549
550 Version ModuleSQL::GetVersion()
551 {
552         return Version("Provides MySQL support", VF_VENDOR);
553 }
554
555 void DispatcherThread::Run()
556 {
557         this->LockQueue();
558         while (!this->GetExitFlag())
559         {
560                 if (!Parent->qq.empty())
561                 {
562                         QueryQueueItem i = Parent->qq.front();
563                         i.connection->lock.Lock();
564                         this->UnlockQueue();
565                         MySQLresult* res = i.connection->DoBlockingQuery(i.querystr);
566                         i.connection->lock.Unlock();
567
568                         /*
569                          * At this point, the main thread could be working on:
570                          *  Rehash - delete i.connection out from under us. We don't care about that.
571                          *  UnloadModule - delete i.query and the qq item. Need to avoid reporting results.
572                          */
573
574                         this->LockQueue();
575                         if (!Parent->qq.empty() && Parent->qq.front().query == i.query)
576                         {
577                                 Parent->qq.pop_front();
578                                 Parent->rq.push_back(ResultQueueItem(i.query, res));
579                                 NotifyParent();
580                         }
581                         else
582                         {
583                                 // UnloadModule ate the query
584                                 delete res;
585                         }
586                 }
587                 else
588                 {
589                         /* We know the queue is empty, we can safely hang this thread until
590                          * something happens
591                          */
592                         this->WaitForQueue();
593                 }
594         }
595         this->UnlockQueue();
596 }
597
598 void DispatcherThread::OnNotify()
599 {
600         // this could unlock during the dispatch, but OnResult isn't expected to take that long
601         this->LockQueue();
602         for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
603         {
604                 MySQLresult* res = i->result;
605                 if (res->err.code == SQL::SUCCESS)
606                         i->query->OnResult(*res);
607                 else
608                         i->query->OnError(res->err);
609                 delete i->query;
610                 delete i->result;
611         }
612         Parent->rq.clear();
613         this->UnlockQueue();
614 }
615
616 MODULE_INIT(ModuleSQL)