]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_mssql.cpp
8f9f4b97567748d7a6c41aefff7339fc5980da7a
[user/henk/code/inspircd.git] / src / modules / extra / m_mssql.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2008 InspIRCd Development Team
6  * See: http://www.inspircd.org/wiki/index.php/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include <tds.h>
16 #include <tdsconvert.h>
17 #include "users.h"
18 #include "channels.h"
19 #include "modules.h"
20
21 #include "m_sqlv2.h"
22
23 /* $ModDesc: MsSQL provider */
24 /* $LinkerFlags: -ltds */
25 /* $ModDep: m_sqlv2.h */
26
27 class SQLConn;
28 class MsSQLResult;
29 class ResultNotifier;
30 class ModuleMsSQL;
31
32 typedef std::map<std::string, SQLConn*> ConnMap;
33 typedef std::deque<MsSQLResult*> ResultQueue;
34
35 ResultNotifier* resultnotify = NULL;
36 ResultNotifier* resultdispatch = NULL;
37 int QueueFD = -1;
38 ConnMap connections;
39 Mutex* QueueMutex;
40 Mutex* ResultsMutex;
41 Mutex* LoggingMutex;
42
43
44 class QueryThread : public Thread
45 {
46   private:
47         ModuleMsSQL* Parent;
48         InspIRCd* Instance;
49   public:
50         QueryThread(InspIRCd* si, ModuleMsSQL* mod)
51         : Thread(), Parent(mod), Instance(si)
52         {
53         }
54         ~QueryThread() { }
55         virtual void Run();
56 };
57
58 class ResultNotifier : public BufferedSocket
59 {
60         Module* mod;
61         insp_sockaddr sock_us;
62         socklen_t uslen;
63
64  public:
65         /* Create a socket on a random port. Let the tcp stack allocate us an available port */
66 #ifdef IPV6
67         ResultNotifier(InspIRCd* SI, Module* m) : BufferedSocket(SI, "::1", 0, true, 3000), mod(m)
68 #else
69         ResultNotifier(InspIRCd* SI, Module* m) : BufferedSocket(SI, "127.0.0.1", 0, true, 3000), mod(m)
70 #endif
71         {
72                 uslen = sizeof(sock_us);
73                 if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
74                 {
75                         throw ModuleException("Could not create random listening port on localhost");
76                 }
77         }
78
79         ResultNotifier(InspIRCd* SI, Module* m, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), mod(m)
80         {
81         }
82
83         /* Using getsockname and ntohs, we can determine which port number we were allocated */
84         int GetPort()
85         {
86 #ifdef IPV6
87                 return ntohs(sock_us.sin6_port);
88 #else
89                 return ntohs(sock_us.sin_port);
90 #endif
91         }
92
93         virtual int OnIncomingConnection(int newsock, char* ip)
94         {
95                 resultdispatch = new ResultNotifier(Instance, mod, newsock, ip);
96                 return true;
97         }
98
99         virtual bool OnDataReady()
100         {
101                 char data = 0;
102                 if (Instance->SE->Recv(this, &data, 1, 0) > 0)
103                 {
104                         Dispatch();
105                         return true;
106                 }
107                 return false;
108         }
109
110         void Dispatch();
111 };
112
113
114 class MsSQLResult : public SQLresult
115 {
116  private:
117         int currentrow;
118         int rows;
119         int cols;
120
121         std::vector<std::string> colnames;
122         std::vector<SQLfieldList> fieldlists;
123         SQLfieldList emptyfieldlist;
124
125         SQLfieldList* fieldlist;
126         SQLfieldMap* fieldmap;
127
128  public:
129         MsSQLResult(Module* self, Module* to, unsigned int rid)
130         : SQLresult(self, to, rid), currentrow(0), rows(0), cols(0), fieldlist(NULL), fieldmap(NULL)
131         {
132         }
133
134         ~MsSQLResult()
135         {
136         }
137
138         void AddRow(int colsnum, char **dat, char **colname)
139         {
140                 colnames.clear();
141                 cols = colsnum;
142                 for (int i = 0; i < colsnum; i++)
143                 {
144                         fieldlists.resize(fieldlists.size()+1);
145                         colnames.push_back(colname[i]);
146                         SQLfield sf(dat[i] ? dat[i] : "", dat[i] ? false : true);
147                         fieldlists[rows].push_back(sf);
148                 }
149                 rows++;
150         }
151
152         void UpdateAffectedCount()
153         {
154                 rows++;
155         }
156
157         virtual int Rows()
158         {
159                 return rows;
160         }
161
162         virtual int Cols()
163         {
164                 return cols;
165         }
166
167         virtual std::string ColName(int column)
168         {
169                 if (column < (int)colnames.size())
170                 {
171                         return colnames[column];
172                 }
173                 else
174                 {
175                         throw SQLbadColName();
176                 }
177                 return "";
178         }
179
180         virtual int ColNum(const std::string &column)
181         {
182                 for (unsigned int i = 0; i < colnames.size(); i++)
183                 {
184                         if (column == colnames[i])
185                                 return i;
186                 }
187                 throw SQLbadColName();
188                 return 0;
189         }
190
191         virtual SQLfield GetValue(int row, int column)
192         {
193                 if ((row >= 0) && (row < rows) && (column >= 0) && (column < Cols()))
194                 {
195                         return fieldlists[row][column];
196                 }
197
198                 throw SQLbadColName();
199
200                 /* XXX: We never actually get here because of the throw */
201                 return SQLfield("",true);
202         }
203
204         virtual SQLfieldList& GetRow()
205         {
206                 if (currentrow < rows)
207                         return fieldlists[currentrow];
208                 else
209                         return emptyfieldlist;
210         }
211
212         virtual SQLfieldMap& GetRowMap()
213         {
214                 /* In an effort to reduce overhead we don't actually allocate the map
215                  * until the first time it's needed...so...
216                  */
217                 if(fieldmap)
218                 {
219                         fieldmap->clear();
220                 }
221                 else
222                 {
223                         fieldmap = new SQLfieldMap;
224                 }
225
226                 if (currentrow < rows)
227                 {
228                         for (int i = 0; i < Cols(); i++)
229                         {
230                                 fieldmap->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
231                         }
232                         currentrow++;
233                 }
234
235                 return *fieldmap;
236         }
237
238         virtual SQLfieldList* GetRowPtr()
239         {
240                 fieldlist = new SQLfieldList();
241
242                 if (currentrow < rows)
243                 {
244                         for (int i = 0; i < Rows(); i++)
245                         {
246                                 fieldlist->push_back(fieldlists[currentrow][i]);
247                         }
248                         currentrow++;
249                 }
250                 return fieldlist;
251         }
252
253         virtual SQLfieldMap* GetRowMapPtr()
254         {
255                 fieldmap = new SQLfieldMap();
256
257                 if (currentrow < rows)
258                 {
259                         for (int i = 0; i < Cols(); i++)
260                         {
261                                 fieldmap->insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
262                         }
263                         currentrow++;
264                 }
265
266                 return fieldmap;
267         }
268
269         virtual void Free(SQLfieldMap* fm)
270         {
271                 delete fm;
272         }
273
274         virtual void Free(SQLfieldList* fl)
275         {
276                 delete fl;
277         }
278
279
280 };
281
282 class SQLConn : public classbase
283 {
284  private:
285         ResultQueue results;
286         InspIRCd* Instance;
287         Module* mod;
288         SQLhost host;
289         TDSLOGIN* login;
290         TDSSOCKET* sock;
291         TDSCONTEXT* context;
292
293  public:
294         QueryQueue queue;
295
296         SQLConn(InspIRCd* SI, Module* m, const SQLhost& hi)
297         : Instance(SI), mod(m), host(hi), login(NULL), sock(NULL), context(NULL)
298         {
299                 if (OpenDB())
300                 {
301                         std::string query("USE " + host.name);
302                         if (tds_submit_query(sock, query.c_str()) == TDS_SUCCEED)
303                         {
304                                 if (tds_process_simple_query(sock) != TDS_SUCCEED)
305                                 {
306                                         LoggingMutex->Lock();
307                                         Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
308                                         LoggingMutex->Unlock();
309                                         CloseDB();
310                                 }
311                         }
312                         else
313                         {
314                                 LoggingMutex->Lock();
315                                 Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
316                                 LoggingMutex->Unlock();
317                                 CloseDB();
318                         }
319                 }
320                 else
321                 {
322                         LoggingMutex->Lock();
323                         Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not connect to DB with id: " + host.id);
324                         LoggingMutex->Unlock();
325                         CloseDB();
326                 }
327         }
328
329         ~SQLConn()
330         {
331                 CloseDB();
332         }
333
334         SQLerror Query(SQLrequest &req)
335         {
336                 if (!sock)
337                         return SQLerror(SQL_BAD_CONN, "Socket was NULL, check if SQL server is running.");
338
339                 /* Pointer to the buffer we screw around with substitution in */
340                 char* query;
341
342                 /* Pointer to the current end of query, where we append new stuff */
343                 char* queryend;
344
345                 /* Total length of the unescaped parameters */
346                 unsigned long paramlen;
347
348                 /* Total length of query, used for binary-safety in mysql_real_query */
349                 unsigned long querylength = 0;
350
351                 paramlen = 0;
352                 for(ParamL::iterator i = req.query.p.begin(); i != req.query.p.end(); i++)
353                 {
354                         paramlen += i->size();
355                 }
356
357                 /* To avoid a lot of allocations, allocate enough memory for the biggest the escaped query could possibly be.
358                  * sizeofquery + (totalparamlength*2) + 1
359                  *
360                  * The +1 is for null-terminating the string
361                  */
362                 query = new char[req.query.q.length() + (paramlen*2) + 1];
363                 queryend = query;
364
365                 for(unsigned long i = 0; i < req.query.q.length(); i++)
366                 {
367                         if(req.query.q[i] == '?')
368                         {
369                                 if(req.query.p.size())
370                                 {
371                                         /* Custom escaping for this one. converting ' to '' should make SQL Server happy. Ugly but fast :]
372                                          */
373                                         char* escaped = new char[(req.query.p.front().length() * 2) + 1];
374                                         char* escend = escaped;
375                                         for (std::string::iterator p = req.query.p.front().begin(); p < req.query.p.front().end(); p++)
376                                         {
377                                                 if (*p == '\'')
378                                                 {
379                                                         *escend = *p;
380                                                         escend++;
381                                                         *escend = *p;
382                                                 }
383                                                 *escend = *p;
384                                                 escend++;
385                                         }
386                                         *escend = 0;
387
388                                         for (char* n = escaped; *n; n++)
389                                         {
390                                                 *queryend = *n;
391                                                 queryend++;
392                                         }
393                                         delete[] escaped;
394                                         req.query.p.pop_front();
395                                 }
396                                 else
397                                         break;
398                         }
399                         else
400                         {
401                                 *queryend = req.query.q[i];
402                                 queryend++;
403                         }
404                         querylength++;
405                 }
406                 *queryend = 0;
407                 req.query.q = query;
408
409                 MsSQLResult* res = new MsSQLResult((Module*)mod, req.GetSource(), req.id);
410                 res->dbid = host.id;
411                 res->query = req.query.q;
412
413                 char* msquery = strdup(req.query.q.data());
414                 LoggingMutex->Lock();
415                 Instance->Logs->Log("m_mssql",DEBUG,"doing Query: %s",msquery);
416                 LoggingMutex->Unlock();
417                 if (tds_submit_query(sock, msquery) != TDS_SUCCEED)
418                 {
419                         std::string error("failed to execute: "+std::string(req.query.q.data()));
420                         delete[] query;
421                         delete res;
422                         free(msquery);
423                         return SQLerror(SQL_QSEND_FAIL, error);
424                 }
425                 delete[] query;
426                 free(msquery);
427
428                 int tds_res;
429                 while (tds_process_tokens(sock, &tds_res, NULL, TDS_TOKEN_RESULTS) == TDS_SUCCEED)
430                 {
431                         //Instance->Logs->Log("m_mssql",DEBUG,"<******> result type: %d", tds_res);
432                         //Instance->Logs->Log("m_mssql",DEBUG,"AFFECTED ROWS: %d", sock->rows_affected);
433                         switch (tds_res)
434                         {
435                                 case TDS_ROWFMT_RESULT:
436                                         break;
437
438                                 case TDS_DONE_RESULT:
439                                         if (sock->rows_affected > -1)
440                                         {
441                                                 for (int c = 0; c < sock->rows_affected; c++)  res->UpdateAffectedCount();
442                                                 continue;
443                                         }
444                                         break;
445
446                                 case TDS_ROW_RESULT:
447                                         while (tds_process_tokens(sock, &tds_res, NULL, TDS_STOPAT_ROWFMT|TDS_RETURN_DONE|TDS_RETURN_ROW) == TDS_SUCCEED)
448                                         {
449                                                 if (tds_res != TDS_ROW_RESULT)
450                                                         break;
451
452                                                 if (!sock->current_results)
453                                                         continue;
454
455                                                 if (sock->res_info->row_count > 0)
456                                                 {
457                                                         int cols = sock->res_info->num_cols;
458                                                         char** name = new char*[MAXBUF];
459                                                         char** data = new char*[MAXBUF];
460                                                         for (int j=0; j<cols; j++)
461                                                         {
462                                                                 TDSCOLUMN* col = sock->current_results->columns[j];
463                                                                 name[j] = col->column_name;
464
465                                                                 int ctype;
466                                                                 int srclen;
467                                                                 unsigned char* src;
468                                                                 CONV_RESULT dres;
469                                                                 ctype = tds_get_conversion_type(col->column_type, col->column_size);
470                                                                 src = &(sock->current_results->current_row[col->column_offset]);
471                                                                 srclen = col->column_cur_size;
472                                                                 tds_convert(sock->tds_ctx, ctype, (TDS_CHAR *) src, srclen, SYBCHAR, &dres);
473                                                                 data[j] = (char*)dres.ib;
474                                                         }
475                                                         ResultReady(res, cols, data, name);
476                                                 }
477                                         }
478                                         break;
479
480                                 default:
481                                         break;
482                         }
483                 }
484                 ResultsMutex->Lock();
485                 results.push_back(res);
486                 ResultsMutex->Unlock();
487                 SendNotify();
488                 return SQLerror();
489         }
490
491         static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
492         {
493                 SQLConn* sc = (SQLConn*)pContext->parent;
494                 LoggingMutex->Lock();
495                 sc->Instance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
496                 LoggingMutex->Unlock();
497                 return 0;
498         }
499
500         static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
501         {
502                 SQLConn* sc = (SQLConn*)pContext->parent;
503                 LoggingMutex->Lock();
504                 sc->Instance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
505                 LoggingMutex->Unlock();
506                 return 0;
507         }
508
509         void ResultReady(MsSQLResult *res, int cols, char **data, char **colnames)
510         {
511                 res->AddRow(cols, data, colnames);
512         }
513
514         void AffectedReady(MsSQLResult *res)
515         {
516                 res->UpdateAffectedCount();
517         }
518
519         bool OpenDB()
520         {
521                 CloseDB();
522
523                 TDSCONNECTION* conn = NULL;
524
525                 login = tds_alloc_login();
526                 tds_set_app(login, "TSQL");
527                 tds_set_library(login,"TDS-Library");
528                 tds_set_host(login, "");
529                 tds_set_server(login, host.host.c_str());
530                 tds_set_server_addr(login, host.host.c_str());
531                 tds_set_user(login, host.user.c_str());
532                 tds_set_passwd(login, host.pass.c_str());
533                 tds_set_port(login, host.port);
534                 tds_set_packet(login, 512);
535
536                 context = tds_alloc_context(this);
537                 context->msg_handler = HandleMessage;
538                 context->err_handler = HandleError;
539
540                 sock = tds_alloc_socket(context, 512);
541                 tds_set_parent(sock, NULL);
542
543                 conn = tds_read_config_info(NULL, login, context->locale);
544
545                 if (tds_connect(sock, conn) == TDS_SUCCEED)
546                 {
547                         tds_free_connection(conn);
548                         return 1;
549                 }
550                 tds_free_connection(conn);
551                 return 0;
552         }
553
554         void CloseDB()
555         {
556                 if (sock)
557                 {
558                         tds_free_socket(sock);
559                         sock = NULL;
560                 }
561                 if (context)
562                 {
563                         tds_free_context(context);
564                         context = NULL;
565                 }
566                 if (login)
567                 {
568                         tds_free_login(login);
569                         login = NULL;
570                 }
571         }
572
573         SQLhost GetConfHost()
574         {
575                 return host;
576         }
577
578         void SendResults()
579         {
580                 while (results.size())
581                 {
582                         MsSQLResult* res = results[0];
583                         ResultsMutex->Lock();
584                         if (res->GetDest())
585                         {
586                                 res->Send();
587                         }
588                         else
589                         {
590                                 /* If the client module is unloaded partway through a query then the provider will set
591                                  * the pointer to NULL. We cannot just cancel the query as the result will still come
592                                  * through at some point...and it could get messy if we play with invalid pointers...
593                                  */
594                                 delete res;
595                         }
596                         results.pop_front();
597                         ResultsMutex->Unlock();
598                 }
599         }
600
601         void ClearResults()
602         {
603                 while (results.size())
604                 {
605                         MsSQLResult* res = results[0];
606                         delete res;
607                         results.pop_front();
608                 }
609         }
610
611         void SendNotify()
612         {
613                 if (QueueFD < 0)
614                 {
615                         if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1)
616                         {
617                                 /* crap, we're out of sockets... */
618                                 return;
619                         }
620
621                         insp_sockaddr addr;
622
623 #ifdef IPV6
624                         insp_aton("::1", &addr.sin6_addr);
625                         addr.sin6_family = AF_FAMILY;
626                         addr.sin6_port = htons(resultnotify->GetPort());
627 #else
628                         insp_inaddr ia;
629                         insp_aton("127.0.0.1", &ia);
630                         addr.sin_family = AF_FAMILY;
631                         addr.sin_addr = ia;
632                         addr.sin_port = htons(resultnotify->GetPort());
633 #endif
634
635                         if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
636                         {
637                                 /* wtf, we cant connect to it, but we just created it! */
638                                 return;
639                         }
640                 }
641                 char id = 0;
642                 send(QueueFD, &id, 1, 0);
643         }
644
645         void DoLeadingQuery()
646         {
647                 SQLrequest& req = queue.front();
648                 req.error = Query(req);
649         }
650
651 };
652
653
654 class ModuleMsSQL : public Module
655 {
656  private:
657         unsigned long currid;
658         QueryThread* Dispatcher;
659
660  public:
661         ModuleMsSQL(InspIRCd* Me)
662         : Module(Me), currid(0)
663         {
664                 LoggingMutex = ServerInstance->Mutexes->CreateMutex();
665                 ResultsMutex = ServerInstance->Mutexes->CreateMutex();
666                 QueueMutex = ServerInstance->Mutexes->CreateMutex();
667
668                 ServerInstance->Modules->UseInterface("SQLutils");
669
670                 if (!ServerInstance->Modules->PublishFeature("SQL", this))
671                 {
672                         throw ModuleException("m_mssql: Unable to publish feature 'SQL'");
673                 }
674
675                 resultnotify = new ResultNotifier(ServerInstance, this);
676
677                 ReadConf();
678
679                 Dispatcher = new QueryThread(ServerInstance, this);
680                 ServerInstance->Threads->Create(Dispatcher);
681
682                 ServerInstance->Modules->PublishInterface("SQL", this);
683                 Implementation eventlist[] = { I_OnRequest, I_OnRehash };
684                 ServerInstance->Modules->Attach(eventlist, this, 2);
685         }
686
687         virtual ~ModuleMsSQL()
688         {
689                 delete Dispatcher;
690                 ClearQueue();
691                 ClearAllConnections();
692
693                 ServerInstance->SE->DelFd(resultnotify);
694                 resultnotify->Close();
695                 ServerInstance->BufferedSocketCull();
696
697                 if (QueueFD >= 0)
698                 {
699                         shutdown(QueueFD, 2);
700                         close(QueueFD);
701                 }
702
703                 if (resultdispatch)
704                 {
705                         ServerInstance->SE->DelFd(resultdispatch);
706                         resultdispatch->Close();
707                         ServerInstance->BufferedSocketCull();
708                 }
709
710                 ServerInstance->Modules->UnpublishInterface("SQL", this);
711                 ServerInstance->Modules->UnpublishFeature("SQL");
712                 ServerInstance->Modules->DoneWithInterface("SQLutils");
713
714                 delete LoggingMutex;
715                 delete ResultsMutex;
716                 delete QueueMutex;
717         }
718
719
720         void SendQueue()
721         {
722                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
723                 {
724                         iter->second->SendResults();
725                 }
726         }
727
728         void ClearQueue()
729         {
730                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
731                 {
732                         iter->second->ClearResults();
733                 }
734         }
735
736         bool HasHost(const SQLhost &host)
737         {
738                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
739                 {
740                         if (host == iter->second->GetConfHost())
741                                 return true;
742                 }
743                 return false;
744         }
745
746         bool HostInConf(const SQLhost &h)
747         {
748                 ConfigReader conf(ServerInstance);
749                 for(int i = 0; i < conf.Enumerate("database"); i++)
750                 {
751                         SQLhost host;
752                         host.id         = conf.ReadValue("database", "id", i);
753                         host.host       = conf.ReadValue("database", "hostname", i);
754                         host.port       = conf.ReadInteger("database", "port", "1433", i, true);
755                         host.name       = conf.ReadValue("database", "name", i);
756                         host.user       = conf.ReadValue("database", "username", i);
757                         host.pass       = conf.ReadValue("database", "password", i);
758                         if (h == host)
759                                 return true;
760                 }
761                 return false;
762         }
763
764         void ReadConf()
765         {
766                 ClearOldConnections();
767
768                 ConfigReader conf(ServerInstance);
769                 for(int i = 0; i < conf.Enumerate("database"); i++)
770                 {
771                         SQLhost host;
772
773                         host.id         = conf.ReadValue("database", "id", i);
774                         host.host       = conf.ReadValue("database", "hostname", i);
775                         host.port       = conf.ReadInteger("database", "port", "1433", i, true);
776                         host.name       = conf.ReadValue("database", "name", i);
777                         host.user       = conf.ReadValue("database", "username", i);
778                         host.pass       = conf.ReadValue("database", "password", i);
779
780                         if (HasHost(host))
781                                 continue;
782
783                         this->AddConn(host);
784                 }
785         }
786
787         void AddConn(const SQLhost& hi)
788         {
789                 if (HasHost(hi))
790                 {
791                         LoggingMutex->Lock();
792                         ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str());
793                         LoggingMutex->Unlock();
794                         return;
795                 }
796
797                 SQLConn* newconn;
798
799                 newconn = new SQLConn(ServerInstance, this, hi);
800
801                 connections.insert(std::make_pair(hi.id, newconn));
802         }
803
804         void ClearOldConnections()
805         {
806                 ConnMap::iterator iter,safei;
807                 for (iter = connections.begin(); iter != connections.end(); iter++)
808                 {
809                         if (!HostInConf(iter->second->GetConfHost()))
810                         {
811                                 delete iter->second;
812                                 safei = iter;
813                                 --iter;
814                                 connections.erase(safei);
815                         }
816                 }
817         }
818
819         void ClearAllConnections()
820         {
821                 ConnMap::iterator i;
822                 while ((i = connections.begin()) != connections.end())
823                 {
824                         connections.erase(i);
825                         delete i->second;
826                 }
827         }
828
829         virtual void OnRehash(User* user, const std::string &parameter)
830         {
831                 QueueMutex->Lock();
832                 ReadConf();
833                 QueueMutex->Unlock();
834         }
835
836         virtual const char* OnRequest(Request* request)
837         {
838                 if(strcmp(SQLREQID, request->GetId()) == 0)
839                 {
840                         SQLrequest* req = (SQLrequest*)request;
841
842                         QueueMutex->Lock();
843
844                         ConnMap::iterator iter;
845
846                         const char* returnval = NULL;
847
848                         if((iter = connections.find(req->dbid)) != connections.end())
849                         {
850                                 req->id = NewID();
851                                 iter->second->queue.push(*req);
852                                 returnval= SQLSUCCESS;
853                         }
854                         else
855                         {
856                                 req->error.Id(SQL_BAD_DBID);
857                         }
858
859                         QueueMutex->Unlock();
860
861                         return returnval;
862                 }
863                 return NULL;
864         }
865
866         unsigned long NewID()
867         {
868                 if (currid+1 == 0)
869                         currid++;
870
871                 return ++currid;
872         }
873
874         virtual Version GetVersion()
875         {
876                 return Version("$Id$", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION);
877         }
878
879 };
880
881 void ResultNotifier::Dispatch()
882 {
883         ((ModuleMsSQL*)mod)->SendQueue();
884 }
885
886 void QueryThread::Run()
887 {
888         while (this->GetExitFlag() == false)
889         {
890                 SQLConn* conn = NULL;
891                 QueueMutex->Lock();
892                 for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
893                 {
894                         if (i->second->queue.totalsize())
895                         {
896                                 conn = i->second;
897                                 break;
898                         }
899                 }
900                 QueueMutex->Unlock();
901                 if (conn)
902                 {
903                         conn->DoLeadingQuery();
904                         QueueMutex->Lock();
905                         conn->queue.pop();
906                         QueueMutex->Unlock();
907                 }
908                 usleep(1000);
909         }
910 }
911
912 MODULE_INIT(ModuleMsSQL)