]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_mysql.cpp
Fix silly oversight discovered by tra26 (thanks!) where the core tries to handle...
[user/henk/code/inspircd.git] / src / modules / extra / m_mysql.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2008 InspIRCd Development Team
6  * See: http://www.inspircd.org/wiki/index.php/Credits
7  *
8  * This program is free but copyrighted software; see
9  *          the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 /* Stop mysql wanting to use long long */
15 #define NO_CLIENT_LONG_LONG
16
17 #include "inspircd.h"
18 #include <mysql.h>
19 #include "m_sqlv2.h"
20
21 #ifdef WINDOWS
22 #pragma comment(lib, "mysqlclient.lib")
23 #endif
24
25 /* VERSION 2 API: With nonblocking (threaded) requests */
26
27 /* $ModDesc: SQL Service Provider module for all other m_sql* modules */
28 /* $CompileFlags: exec("mysql_config --include") */
29 /* $LinkerFlags: exec("mysql_config --libs_r") rpath("mysql_config --libs_r") */
30 /* $ModDep: m_sqlv2.h */
31
32 /* THE NONBLOCKING MYSQL API!
33  *
34  * MySQL provides no nonblocking (asyncronous) API of its own, and its developers recommend
35  * that instead, you should thread your program. This is what i've done here to allow for
36  * asyncronous SQL requests via mysql. The way this works is as follows:
37  *
38  * The module spawns a thread via class Thread, and performs its mysql queries in this thread,
39  * using a queue with priorities. There is a mutex on either end which prevents two threads
40  * adjusting the queue at the same time, and crashing the ircd. Every 50 milliseconds, the
41  * worker thread wakes up, and checks if there is a request at the head of its queue.
42  * If there is, it processes this request, blocking the worker thread but leaving the ircd
43  * thread to go about its business as usual. During this period, the ircd thread is able
44  * to insert futher pending requests into the queue.
45  *
46  * Once the processing of a request is complete, it is removed from the incoming queue to
47  * an outgoing queue, and initialized as a 'response'. The worker thread then signals the
48  * ircd thread (via a loopback socket) of the fact a result is available, by sending the
49  * connection ID through the connection.
50  *
51  * The ircd thread then mutexes the queue once more, reads the outbound response off the head
52  * of the queue, and sends it on its way to the original calling module.
53  *
54  * XXX: You might be asking "why doesnt he just send the response from within the worker thread?"
55  * The answer to this is simple. The majority of InspIRCd, and in fact most ircd's are not
56  * threadsafe. This module is designed to be threadsafe and is careful with its use of threads,
57  * however, if we were to call a module's OnRequest even from within a thread which was not the
58  * one the module was originally instantiated upon, there is a chance of all hell breaking loose
59  * if a module is ever put in a re-enterant state (stack corruption could occur, crashes, data
60  * corruption, and worse, so DONT think about it until the day comes when InspIRCd is 100%
61  * gauranteed threadsafe!)
62  *
63  * For a diagram of this system please see http://www.inspircd.org/wiki/Mysql2
64  */
65
66
67 class SQLConnection;
68 class MySQLListener;
69
70
71 typedef std::map<std::string, SQLConnection*> ConnMap;
72 static MySQLListener *MessagePipe = NULL;
73 int QueueFD = -1;
74
75 class DispatcherThread;
76
77 /** MySQL module
78  *  */
79 class ModuleSQL : public Module
80 {
81  public:
82
83          ConfigReader *Conf;
84          InspIRCd* PublicServerInstance;
85          int currid;
86          bool rehashing;
87          DispatcherThread* Dispatcher;
88          Mutex* QueueMutex;
89          Mutex* ResultsMutex;
90          Mutex* LoggingMutex;
91          Mutex* ConnMutex;
92
93          ModuleSQL(InspIRCd* Me);
94          ~ModuleSQL();
95          unsigned long NewID();
96          const char* OnRequest(Request* request);
97          void OnRehash(User* user, const std::string &parameter);
98          Version GetVersion();
99 };
100
101
102
103 #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
104 #define mysql_field_count mysql_num_fields
105 #endif
106
107 typedef std::deque<SQLresult*> ResultQueue;
108
109 /** Represents a mysql result set
110  */
111 class MySQLresult : public SQLresult
112 {
113         int currentrow;
114         std::vector<std::string> colnames;
115         std::vector<SQLfieldList> fieldlists;
116         SQLfieldMap* fieldmap;
117         SQLfieldMap fieldmap2;
118         SQLfieldList emptyfieldlist;
119         int rows;
120  public:
121
122         MySQLresult(Module* self, Module* to, MYSQL_RES* res, int affected_rows, unsigned int rid) : SQLresult(self, to, rid), currentrow(0), fieldmap(NULL)
123         {
124                 /* A number of affected rows from from mysql_affected_rows.
125                  */
126                 fieldlists.clear();
127                 rows = 0;
128                 if (affected_rows >= 1)
129                 {
130                         rows = affected_rows;
131                         fieldlists.resize(rows);
132                 }
133                 unsigned int field_count = 0;
134                 if (res)
135                 {
136                         MYSQL_ROW row;
137                         int n = 0;
138                         while ((row = mysql_fetch_row(res)))
139                         {
140                                 if (fieldlists.size() < (unsigned int)rows+1)
141                                 {
142                                         fieldlists.resize(fieldlists.size()+1);
143                                 }
144                                 field_count = 0;
145                                 MYSQL_FIELD *fields = mysql_fetch_fields(res);
146                                 if(mysql_num_fields(res) == 0)
147                                         break;
148                                 if (fields && mysql_num_fields(res))
149                                 {
150                                         colnames.clear();
151                                         while (field_count < mysql_num_fields(res))
152                                         {
153                                                 std::string a = (fields[field_count].name ? fields[field_count].name : "");
154                                                 std::string b = (row[field_count] ? row[field_count] : "");
155                                                 SQLfield sqlf(b, !row[field_count]);
156                                                 colnames.push_back(a);
157                                                 fieldlists[n].push_back(sqlf);
158                                                 field_count++;
159                                         }
160                                         n++;
161                                 }
162                                 rows++;
163                         }
164                         mysql_free_result(res);
165                 }
166         }
167
168         MySQLresult(Module* self, Module* to, SQLerror e, unsigned int rid) : SQLresult(self, to, rid), currentrow(0)
169         {
170                 rows = 0;
171                 error = e;
172         }
173
174         ~MySQLresult()
175         {
176         }
177
178         virtual int Rows()
179         {
180                 return rows;
181         }
182
183         virtual int Cols()
184         {
185                 return colnames.size();
186         }
187
188         virtual std::string ColName(int column)
189         {
190                 if (column < (int)colnames.size())
191                 {
192                         return colnames[column];
193                 }
194                 else
195                 {
196                         throw SQLbadColName();
197                 }
198                 return "";
199         }
200
201         virtual int ColNum(const std::string &column)
202         {
203                 for (unsigned int i = 0; i < colnames.size(); i++)
204                 {
205                         if (column == colnames[i])
206                                 return i;
207                 }
208                 throw SQLbadColName();
209                 return 0;
210         }
211
212         virtual SQLfield GetValue(int row, int column)
213         {
214                 if ((row >= 0) && (row < rows) && (column >= 0) && (column < Cols()))
215                 {
216                         return fieldlists[row][column];
217                 }
218
219                 throw SQLbadColName();
220
221                 /* XXX: We never actually get here because of the throw */
222                 return SQLfield("",true);
223         }
224
225         virtual SQLfieldList& GetRow()
226         {
227                 if (currentrow < rows)
228                         return fieldlists[currentrow++];
229                 else
230                         return emptyfieldlist;
231         }
232
233         virtual SQLfieldMap& GetRowMap()
234         {
235                 fieldmap2.clear();
236
237                 if (currentrow < rows)
238                 {
239                         for (int i = 0; i < Cols(); i++)
240                         {
241                                 fieldmap2.insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
242                         }
243                         currentrow++;
244                 }
245
246                 return fieldmap2;
247         }
248
249         virtual SQLfieldList* GetRowPtr()
250         {
251                 SQLfieldList* fieldlist = new SQLfieldList();
252
253                 if (currentrow < rows)
254                 {
255                         for (int i = 0; i < Rows(); i++)
256                         {
257                                 fieldlist->push_back(fieldlists[currentrow][i]);
258                         }
259                         currentrow++;
260                 }
261                 return fieldlist;
262         }
263
264         virtual SQLfieldMap* GetRowMapPtr()
265         {
266                 fieldmap = new SQLfieldMap();
267
268                 if (currentrow < rows)
269                 {
270                         for (int i = 0; i < Cols(); i++)
271                         {
272                                 fieldmap->insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
273                         }
274                         currentrow++;
275                 }
276
277                 return fieldmap;
278         }
279
280         virtual void Free(SQLfieldMap* fm)
281         {
282                 delete fm;
283         }
284
285         virtual void Free(SQLfieldList* fl)
286         {
287                 delete fl;
288         }
289 };
290
291 class SQLConnection;
292
293 void NotifyMainThread(SQLConnection* connection_with_new_result);
294
295 /** Represents a connection to a mysql database
296  */
297 class SQLConnection : public classbase
298 {
299  protected:
300
301         MYSQL connection;
302         MYSQL_RES *res;
303         MYSQL_ROW row;
304         SQLhost host;
305         std::map<std::string,std::string> thisrow;
306         bool Enabled;
307         ModuleSQL* Parent;
308
309  public:
310
311         QueryQueue queue;
312         ResultQueue rq;
313
314         // This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
315         SQLConnection(const SQLhost &hi, ModuleSQL* Creator) : host(hi), Enabled(false), Parent(Creator)
316         {
317         }
318
319         ~SQLConnection()
320         {
321                 Close();
322         }
323
324         // This method connects to the database using the credentials supplied to the constructor, and returns
325         // true upon success.
326         bool Connect()
327         {
328                 unsigned int timeout = 1;
329                 mysql_init(&connection);
330                 mysql_options(&connection,MYSQL_OPT_CONNECT_TIMEOUT,(char*)&timeout);
331                 return mysql_real_connect(&connection, host.host.c_str(), host.user.c_str(), host.pass.c_str(), host.name.c_str(), host.port, NULL, 0);
332         }
333
334         void DoLeadingQuery()
335         {
336                 if (!CheckConnection())
337                         return;
338
339                 /* Parse the command string and dispatch it to mysql */
340                 SQLrequest& req = queue.front();
341
342                 /* Pointer to the buffer we screw around with substitution in */
343                 char* query;
344
345                 /* Pointer to the current end of query, where we append new stuff */
346                 char* queryend;
347
348                 /* Total length of the unescaped parameters */
349                 unsigned long paramlen;
350
351                 /* Total length of query, used for binary-safety in mysql_real_query */
352                 unsigned long querylength = 0;
353
354                 paramlen = 0;
355
356                 for(ParamL::iterator i = req.query.p.begin(); i != req.query.p.end(); i++)
357                 {
358                         paramlen += i->size();
359                 }
360
361                 /* To avoid a lot of allocations, allocate enough memory for the biggest the escaped query could possibly be.
362                  * sizeofquery + (totalparamlength*2) + 1
363                  *
364                  * The +1 is for null-terminating the string for mysql_real_escape_string
365                  */
366
367                 query = new char[req.query.q.length() + (paramlen*2) + 1];
368                 queryend = query;
369
370                 /* Okay, now we have a buffer large enough we need to start copying the query into it and escaping and substituting
371                  * the parameters into it...
372                  */
373
374                 for(unsigned long i = 0; i < req.query.q.length(); i++)
375                 {
376                         if(req.query.q[i] == '?')
377                         {
378                                 /* We found a place to substitute..what fun.
379                                  * use mysql calls to escape and write the
380                                  * escaped string onto the end of our query buffer,
381                                  * then we "just" need to make sure queryend is
382                                  * pointing at the right place.
383                                  */
384                                 if(req.query.p.size())
385                                 {
386                                         unsigned long len = mysql_real_escape_string(&connection, queryend, req.query.p.front().c_str(), req.query.p.front().length());
387
388                                         queryend += len;
389                                         req.query.p.pop_front();
390                                 }
391                                 else
392                                         break;
393                         }
394                         else
395                         {
396                                 *queryend = req.query.q[i];
397                                 queryend++;
398                         }
399                         querylength++;
400                 }
401
402                 *queryend = 0;
403
404                 Parent->QueueMutex->Lock();
405                 req.query.q = query;
406                 Parent->QueueMutex->Unlock();
407
408                 if (!mysql_real_query(&connection, req.query.q.data(), req.query.q.length()))
409                 {
410                         /* Successfull query */
411                         res = mysql_use_result(&connection);
412                         unsigned long rows = mysql_affected_rows(&connection);
413                         MySQLresult* r = new MySQLresult(Parent, req.GetSource(), res, rows, req.id);
414                         r->dbid = this->GetID();
415                         r->query = req.query.q;
416                         /* Put this new result onto the results queue.
417                          * XXX: Remember to mutex the queue!
418                          */
419                         Parent->ResultsMutex->Lock();
420                         rq.push_back(r);
421                         Parent->ResultsMutex->Unlock();
422                 }
423                 else
424                 {
425                         /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
426                          * possible error numbers and error messages */
427                         SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(&connection)) + std::string(": ") + mysql_error(&connection));
428                         MySQLresult* r = new MySQLresult(Parent, req.GetSource(), e, req.id);
429                         r->dbid = this->GetID();
430                         r->query = req.query.q;
431
432                         Parent->ResultsMutex->Lock();
433                         rq.push_back(r);
434                         Parent->ResultsMutex->Unlock();
435                 }
436
437                 /* Now signal the main thread that we've got a result to process.
438                  * Pass them this connection id as what to examine
439                  */
440
441                 delete[] query;
442
443                 NotifyMainThread(this);
444         }
445
446         bool ConnectionLost()
447         {
448                 if (&connection) {
449                         return (mysql_ping(&connection) != 0);
450                 }
451                 else return false;
452         }
453
454         bool CheckConnection()
455         {
456                 if (ConnectionLost()) {
457                         return Connect();
458                 }
459                 else return true;
460         }
461
462         std::string GetError()
463         {
464                 return mysql_error(&connection);
465         }
466
467         const std::string& GetID()
468         {
469                 return host.id;
470         }
471
472         std::string GetHost()
473         {
474                 return host.host;
475         }
476
477         void SetEnable(bool Enable)
478         {
479                 Enabled = Enable;
480         }
481
482         bool IsEnabled()
483         {
484                 return Enabled;
485         }
486
487         void Close()
488         {
489                 mysql_close(&connection);
490         }
491
492         const SQLhost& GetConfHost()
493         {
494                 return host;
495         }
496
497 };
498
499 ConnMap Connections;
500
501 bool HasHost(const SQLhost &host)
502 {
503         for (ConnMap::iterator iter = Connections.begin(); iter != Connections.end(); iter++)
504         {
505                 if (host == iter->second->GetConfHost())
506                         return true;
507         }
508         return false;
509 }
510
511 bool HostInConf(ConfigReader* conf, const SQLhost &h)
512 {
513         for(int i = 0; i < conf->Enumerate("database"); i++)
514         {
515                 SQLhost host;
516                 host.id         = conf->ReadValue("database", "id", i);
517                 host.host       = conf->ReadValue("database", "hostname", i);
518                 host.port       = conf->ReadInteger("database", "port", i, true);
519                 host.name       = conf->ReadValue("database", "name", i);
520                 host.user       = conf->ReadValue("database", "username", i);
521                 host.pass       = conf->ReadValue("database", "password", i);
522                 host.ssl        = conf->ReadFlag("database", "ssl", i);
523                 if (h == host)
524                         return true;
525         }
526         return false;
527 }
528
529 void ClearOldConnections(ConfigReader* conf)
530 {
531         ConnMap::iterator i,safei;
532         for (i = Connections.begin(); i != Connections.end(); i++)
533         {
534                 if (!HostInConf(conf, i->second->GetConfHost()))
535                 {
536                         delete i->second;
537                         safei = i;
538                         --i;
539                         Connections.erase(safei);
540                 }
541         }
542 }
543
544 void ClearAllConnections()
545 {
546         ConnMap::iterator i;
547         while ((i = Connections.begin()) != Connections.end())
548         {
549                 Connections.erase(i);
550                 delete i->second;
551         }
552 }
553
554 void ConnectDatabases(InspIRCd* ServerInstance, ModuleSQL* Parent)
555 {
556         for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++)
557         {
558                 if (i->second->IsEnabled())
559                         continue;
560
561                 i->second->SetEnable(true);
562                 if (!i->second->Connect())
563                 {
564                         /* XXX: MUTEX */
565                         Parent->LoggingMutex->Lock();
566                         ServerInstance->Logs->Log("m_mysql",DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError());
567                         i->second->SetEnable(false);
568                         Parent->LoggingMutex->Unlock();
569                 }
570         }
571 }
572
573 void LoadDatabases(ConfigReader* conf, InspIRCd* ServerInstance, ModuleSQL* Parent)
574 {
575         Parent->ConnMutex->Lock();
576         ClearOldConnections(conf);
577         for (int j =0; j < conf->Enumerate("database"); j++)
578         {
579                 SQLhost host;
580                 host.id         = conf->ReadValue("database", "id", j);
581                 host.host       = conf->ReadValue("database", "hostname", j);
582                 host.port       = conf->ReadInteger("database", "port", j, true);
583                 host.name       = conf->ReadValue("database", "name", j);
584                 host.user       = conf->ReadValue("database", "username", j);
585                 host.pass       = conf->ReadValue("database", "password", j);
586                 host.ssl        = conf->ReadFlag("database", "ssl", j);
587
588                 if (HasHost(host))
589                         continue;
590
591                 if (!host.id.empty() && !host.host.empty() && !host.name.empty() && !host.user.empty() && !host.pass.empty())
592                 {
593                         SQLConnection* ThisSQL = new SQLConnection(host, Parent);
594                         Connections[host.id] = ThisSQL;
595                 }
596         }
597         ConnectDatabases(ServerInstance, Parent);
598         Parent->ConnMutex->Unlock();
599 }
600
601 char FindCharId(const std::string &id)
602 {
603         char i = 1;
604         for (ConnMap::iterator iter = Connections.begin(); iter != Connections.end(); ++iter, ++i)
605         {
606                 if (iter->first == id)
607                 {
608                         return i;
609                 }
610         }
611         return 0;
612 }
613
614 ConnMap::iterator GetCharId(char id)
615 {
616         char i = 1;
617         for (ConnMap::iterator iter = Connections.begin(); iter != Connections.end(); ++iter, ++i)
618         {
619                 if (i == id)
620                         return iter;
621         }
622         return Connections.end();
623 }
624
625 void NotifyMainThread(SQLConnection* connection_with_new_result)
626 {
627         /* Here we write() to the socket the main thread has open
628          * and we connect()ed back to before our thread became active.
629          * The main thread is using a nonblocking socket tied into
630          * the socket engine, so they wont block and they'll receive
631          * nearly instant notification. Because we're in a seperate
632          * thread, we can just use standard connect(), and we can
633          * block if we like. We just send the connection id of the
634          * connection back.
635          *
636          * NOTE: We only send a single char down the connection, this
637          * way we know it wont get a partial read at the other end if
638          * the system is especially congested (see bug #263).
639          * The function FindCharId translates a connection name into a
640          * one character id, and GetCharId translates a character id
641          * back into an iterator.
642          */
643         char id = FindCharId(connection_with_new_result->GetID());
644         send(QueueFD, &id, 1, 0);
645 }
646
647 class ModuleSQL;
648
649 class DispatcherThread : public Thread
650 {
651  private:
652         ModuleSQL* Parent;
653         InspIRCd* ServerInstance;
654  public:
655         DispatcherThread(InspIRCd* Instance, ModuleSQL* CreatorModule) : Thread(), Parent(CreatorModule), ServerInstance(Instance) { }
656         ~DispatcherThread() { }
657         virtual void Run();
658 };
659
660 /** Used by m_mysql to notify one thread when the other has a result
661  */
662 class Notifier : public BufferedSocket
663 {
664         ModuleSQL* Parent;
665
666  public:
667         Notifier(ModuleSQL* P, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), Parent(P) { }
668
669         virtual bool OnDataReady()
670         {
671                 char data = 0;
672                 /* NOTE: Only a single character is read so we know we
673                  * cant get a partial read. (We've been told that theres
674                  * data waiting, so we wont ever get EAGAIN)
675                  * The function GetCharId translates a single character
676                  * back into an iterator.
677                  */
678
679                 if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0)
680                 {
681                         Parent->ConnMutex->Lock();
682                         ConnMap::iterator iter = GetCharId(data);
683                         if (iter != Connections.end())
684                         {
685                                 /* Lock the mutex, send back the data */
686                                 Parent->ResultsMutex->Lock();
687                                 ResultQueue::iterator n = iter->second->rq.begin();
688                                 (*n)->Send();
689                                 delete (*n);
690                                 iter->second->rq.pop_front();
691                                 Parent->ResultsMutex->Unlock();
692                                 Parent->ConnMutex->Unlock();
693                                 return true;
694                         }
695                         /* No error, but unknown id */
696                         Parent->ConnMutex->Unlock();
697                         return true;
698                 }
699
700                 /* Erk, error on descriptor! */
701                 return false;
702         }
703 };
704
705 /** Spawn sockets from a listener
706  */
707 class MySQLListener : public ListenSocketBase
708 {
709         ModuleSQL* Parent;
710         irc::sockets::insp_sockaddr sock_us;
711         socklen_t uslen;
712         FileReader* index;
713
714  public:
715         MySQLListener(ModuleSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P)
716         {
717                 uslen = sizeof(sock_us);
718                 if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
719                 {
720                         throw ModuleException("Could not getsockname() to find out port number for ITC port");
721                 }
722         }
723
724         virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip)
725         {
726                 new Notifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str()); // XXX unsafe casts suck
727         }
728
729         /* Using getsockname and ntohs, we can determine which port number we were allocated */
730         int GetPort()
731         {
732 #ifdef IPV6
733                 return ntohs(sock_us.sin6_port);
734 #else
735                 return ntohs(sock_us.sin_port);
736 #endif
737         }
738 };
739
740 ModuleSQL::ModuleSQL(InspIRCd* Me) : Module(Me), rehashing(false)
741 {
742         ServerInstance->Modules->UseInterface("SQLutils");
743
744         Conf = new ConfigReader(ServerInstance);
745         PublicServerInstance = ServerInstance;
746         currid = 0;
747
748         /* Create a socket on a random port. Let the tcp stack allocate us an available port */
749 #ifdef IPV6
750         MessagePipe = new MySQLListener(this, ServerInstance, 0, "::1");
751 #else
752         MessagePipe = new MySQLListener(this, ServerInstance, 0, "127.0.0.1");
753 #endif
754
755         LoggingMutex = ServerInstance->Mutexes->CreateMutex();
756         ConnMutex = ServerInstance->Mutexes->CreateMutex();
757
758         if (MessagePipe->GetFd() == -1)
759         {
760                 delete ConnMutex;
761                 ServerInstance->Modules->DoneWithInterface("SQLutils");
762                 throw ModuleException("m_mysql: unable to create ITC pipe");
763         }
764         else
765         {
766                 LoggingMutex->Lock();
767                 ServerInstance->Logs->Log("m_mysql", DEBUG, "MySQL: Interthread comms port is %d", MessagePipe->GetPort());
768                 LoggingMutex->Unlock();
769         }
770
771         Dispatcher = new DispatcherThread(ServerInstance, this);
772         ServerInstance->Threads->Create(Dispatcher);
773
774         ResultsMutex = ServerInstance->Mutexes->CreateMutex();
775         QueueMutex = ServerInstance->Mutexes->CreateMutex();
776
777         if (!ServerInstance->Modules->PublishFeature("SQL", this))
778         {
779                 /* Tell worker thread to exit NOW,
780                  * Automatically joins */
781                 delete Dispatcher;
782                 delete LoggingMutex;
783                 delete ResultsMutex;
784                 delete QueueMutex;
785                 delete ConnMutex;
786                 ServerInstance->Modules->DoneWithInterface("SQLutils");
787                 throw ModuleException("m_mysql: Unable to publish feature 'SQL'");
788         }
789
790         ServerInstance->Modules->PublishInterface("SQL", this);
791         Implementation eventlist[] = { I_OnRehash, I_OnRequest };
792         ServerInstance->Modules->Attach(eventlist, this, 2);
793 }
794
795 ModuleSQL::~ModuleSQL()
796 {
797         delete Dispatcher;
798         ClearAllConnections();
799         delete Conf;
800         ServerInstance->Modules->UnpublishInterface("SQL", this);
801         ServerInstance->Modules->UnpublishFeature("SQL");
802         ServerInstance->Modules->DoneWithInterface("SQLutils");
803         delete LoggingMutex;
804         delete ResultsMutex;
805         delete QueueMutex;
806         delete ConnMutex;
807 }
808
809 unsigned long ModuleSQL::NewID()
810 {
811         if (currid+1 == 0)
812                 currid++;
813         return ++currid;
814 }
815
816 const char* ModuleSQL::OnRequest(Request* request)
817 {
818         if(strcmp(SQLREQID, request->GetId()) == 0)
819         {
820                 SQLrequest* req = (SQLrequest*)request;
821
822                 /* XXX: Lock */
823                 QueueMutex->Lock();
824
825                 ConnMap::iterator iter;
826
827                 const char* returnval = NULL;
828
829                 ConnMutex->Lock();
830                 if((iter = Connections.find(req->dbid)) != Connections.end())
831                 {
832                         req->id = NewID();
833                         iter->second->queue.push(*req);
834                         returnval = SQLSUCCESS;
835                 }
836                 else
837                 {
838                         req->error.Id(SQL_BAD_DBID);
839                 }
840
841                 ConnMutex->Unlock();
842                 QueueMutex->Unlock();
843
844                 return returnval;
845         }
846
847         return NULL;
848 }
849
850 void ModuleSQL::OnRehash(User* user, const std::string &parameter)
851 {
852         rehashing = true;
853 }
854
855 Version ModuleSQL::GetVersion()
856 {
857         return Version("$Id$", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION);
858 }
859
860 void DispatcherThread::Run()
861 {
862         LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent);
863
864         /* Connect back to the Notifier */
865
866         if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1)
867         {
868                 /* crap, we're out of sockets... */
869                 return;
870         }
871
872         irc::sockets::insp_sockaddr addr;
873
874 #ifdef IPV6
875         irc::sockets::insp_aton("::1", &addr.sin6_addr);
876         addr.sin6_family = AF_FAMILY;
877         addr.sin6_port = htons(MessagePipe->GetPort());
878 #else
879         irc::sockets::insp_inaddr ia;
880         irc::sockets::insp_aton("127.0.0.1", &ia);
881         addr.sin_family = AF_FAMILY;
882         addr.sin_addr = ia;
883         addr.sin_port = htons(MessagePipe->GetPort());
884 #endif
885
886         if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
887         {
888                 /* wtf, we cant connect to it, but we just created it! */
889                 return;
890         }
891
892         while (this->GetExitFlag() == false)
893         {
894                 if (Parent->rehashing)
895                 {
896                 /* XXX: Lock */
897                         Parent->QueueMutex->Lock();
898                         Parent->rehashing = false;
899                         LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent);
900                         Parent->QueueMutex->Unlock();
901                         /* XXX: Unlock */
902                 }
903
904                 SQLConnection* conn = NULL;
905                 /* XXX: Lock here for safety */
906                 Parent->QueueMutex->Lock();
907                 Parent->ConnMutex->Lock();
908                 for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++)
909                 {
910                         if (i->second->queue.totalsize())
911                         {
912                                 conn = i->second;
913                                 break;
914                         }
915                 }
916                 Parent->ConnMutex->Unlock();
917                 Parent->QueueMutex->Unlock();
918                 /* XXX: Unlock */
919
920                 /* Theres an item! */
921                 if (conn)
922                 {
923                         conn->DoLeadingQuery();
924
925                         /* XXX: Lock */
926                         Parent->QueueMutex->Lock();
927                         conn->queue.pop();
928                         Parent->QueueMutex->Unlock();
929                         /* XXX: Unlock */
930                 }
931
932                 usleep(1000);
933         }
934 }
935
936 MODULE_INIT(ModuleSQL)
937