]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_mssql.cpp
Change httpd modules to use the MODNAME constant in headers.
[user/henk/code/inspircd.git] / src / modules / extra / m_mssql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2008-2009 Dennis Friis <peavey@inspircd.org>
5  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
6  *   Copyright (C) 2008-2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Pippijn van Steenhoven <pip88nl@gmail.com>
9  *
10  * This file is part of InspIRCd.  InspIRCd is free software: you can
11  * redistribute it and/or modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation, version 2.
13  *
14  * This program is distributed in the hope that it will be useful, but WITHOUT
15  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
17  * details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  */
22
23
24 #include "inspircd.h"
25 #include <tds.h>
26 #include <tdsconvert.h>
27 #include "users.h"
28 #include "channels.h"
29 #include "modules.h"
30
31 #include "m_sqlv2.h"
32
33 /* $CompileFlags: exec("grep VERSION_NO /usr/include/tdsver.h 2>/dev/null | perl -e 'print "-D_TDSVER=".((<> =~ /freetds v(\d+\.\d+)/i) ? $1*100 : 0);'") */
34 /* $LinkerFlags: -ltds */
35 /* $ModDep: m_sqlv2.h */
36
37 class SQLConn;
38 class MsSQLResult;
39 class ModuleMsSQL;
40
41 typedef std::map<std::string, SQLConn*> ConnMap;
42 typedef std::deque<MsSQLResult*> ResultQueue;
43
44 unsigned long count(const char * const str, char a)
45 {
46         unsigned long n = 0;
47         for (const char *p = str; *p; ++p)
48         {
49                 if (*p == '?')
50                         ++n;
51         }
52         return n;
53 }
54
55 ConnMap connections;
56 Mutex* ResultsMutex;
57 Mutex* LoggingMutex;
58
59 class QueryThread : public SocketThread
60 {
61   private:
62         ModuleMsSQL* const Parent;
63   public:
64         QueryThread(ModuleMsSQL* mod) : Parent(mod) { }
65         ~QueryThread() { }
66         void Run();
67         void OnNotify();
68 };
69
70 class MsSQLResult : public SQLresult
71 {
72  private:
73         int currentrow;
74         int rows;
75         int cols;
76
77         std::vector<std::string> colnames;
78         std::vector<SQLfieldList> fieldlists;
79         SQLfieldList emptyfieldlist;
80
81         SQLfieldList* fieldlist;
82         SQLfieldMap* fieldmap;
83
84  public:
85         MsSQLResult(Module* self, Module* to, unsigned int rid)
86         : SQLresult(self, to, rid), currentrow(0), rows(0), cols(0), fieldlist(NULL), fieldmap(NULL)
87         {
88         }
89
90         void AddRow(int colsnum, char **dat, char **colname)
91         {
92                 colnames.clear();
93                 cols = colsnum;
94                 for (int i = 0; i < colsnum; i++)
95                 {
96                         fieldlists.resize(fieldlists.size()+1);
97                         colnames.push_back(colname[i]);
98                         SQLfield sf(dat[i] ? dat[i] : "", dat[i] ? false : true);
99                         fieldlists[rows].push_back(sf);
100                 }
101                 rows++;
102         }
103
104         void UpdateAffectedCount()
105         {
106                 rows++;
107         }
108
109         int Rows()
110         {
111                 return rows;
112         }
113
114         int Cols()
115         {
116                 return cols;
117         }
118
119         std::string ColName(int column)
120         {
121                 if (column < (int)colnames.size())
122                 {
123                         return colnames[column];
124                 }
125                 else
126                 {
127                         throw SQLbadColName();
128                 }
129                 return "";
130         }
131
132         int ColNum(const std::string &column)
133         {
134                 for (unsigned int i = 0; i < colnames.size(); i++)
135                 {
136                         if (column == colnames[i])
137                                 return i;
138                 }
139                 throw SQLbadColName();
140                 return 0;
141         }
142
143         SQLfield GetValue(int row, int column)
144         {
145                 if ((row >= 0) && (row < rows) && (column >= 0) && (column < Cols()))
146                 {
147                         return fieldlists[row][column];
148                 }
149
150                 throw SQLbadColName();
151
152                 /* XXX: We never actually get here because of the throw */
153                 return SQLfield("",true);
154         }
155
156         SQLfieldList& GetRow()
157         {
158                 if (currentrow < rows)
159                         return fieldlists[currentrow];
160                 else
161                         return emptyfieldlist;
162         }
163
164         SQLfieldMap& GetRowMap()
165         {
166                 /* In an effort to reduce overhead we don't actually allocate the map
167                  * until the first time it's needed...so...
168                  */
169                 if(fieldmap)
170                 {
171                         fieldmap->clear();
172                 }
173                 else
174                 {
175                         fieldmap = new SQLfieldMap;
176                 }
177
178                 if (currentrow < rows)
179                 {
180                         for (int i = 0; i < Cols(); i++)
181                         {
182                                 fieldmap->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
183                         }
184                         currentrow++;
185                 }
186
187                 return *fieldmap;
188         }
189
190         SQLfieldList* GetRowPtr()
191         {
192                 fieldlist = new SQLfieldList();
193
194                 if (currentrow < rows)
195                 {
196                         for (int i = 0; i < Rows(); i++)
197                         {
198                                 fieldlist->push_back(fieldlists[currentrow][i]);
199                         }
200                         currentrow++;
201                 }
202                 return fieldlist;
203         }
204
205         SQLfieldMap* GetRowMapPtr()
206         {
207                 fieldmap = new SQLfieldMap();
208
209                 if (currentrow < rows)
210                 {
211                         for (int i = 0; i < Cols(); i++)
212                         {
213                                 fieldmap->insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
214                         }
215                         currentrow++;
216                 }
217
218                 return fieldmap;
219         }
220
221         void Free(SQLfieldMap* fm)
222         {
223                 delete fm;
224         }
225
226         void Free(SQLfieldList* fl)
227         {
228                 delete fl;
229         }
230 };
231
232 class SQLConn : public classbase
233 {
234  private:
235         ResultQueue results;
236         Module* mod;
237         SQLhost host;
238         TDSLOGIN* login;
239         TDSSOCKET* sock;
240         TDSCONTEXT* context;
241
242  public:
243         QueryQueue queue;
244
245         SQLConn(Module* m, const SQLhost& hi)
246         : mod(m), host(hi), login(NULL), sock(NULL), context(NULL)
247         {
248                 if (OpenDB())
249                 {
250                         std::string query("USE " + host.name);
251                         if (tds_submit_query(sock, query.c_str()) == TDS_SUCCEED)
252                         {
253                                 if (tds_process_simple_query(sock) != TDS_SUCCEED)
254                                 {
255                                         LoggingMutex->Lock();
256                                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
257                                         LoggingMutex->Unlock();
258                                         CloseDB();
259                                 }
260                         }
261                         else
262                         {
263                                 LoggingMutex->Lock();
264                                 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
265                                 LoggingMutex->Unlock();
266                                 CloseDB();
267                         }
268                 }
269                 else
270                 {
271                         LoggingMutex->Lock();
272                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to DB with id: " + host.id);
273                         LoggingMutex->Unlock();
274                         CloseDB();
275                 }
276         }
277
278         ~SQLConn()
279         {
280                 CloseDB();
281         }
282
283         SQLerror Query(SQLrequest* req)
284         {
285                 if (!sock)
286                         return SQLerror(SQL_BAD_CONN, "Socket was NULL, check if SQL server is running.");
287
288                 /* Pointer to the buffer we screw around with substitution in */
289                 char* query;
290
291                 /* Pointer to the current end of query, where we append new stuff */
292                 char* queryend;
293
294                 /* Total length of the unescaped parameters */
295                 unsigned long maxparamlen, paramcount;
296
297                 /* The length of the longest parameter */
298                 maxparamlen = 0;
299
300                 for(ParamL::iterator i = req->query.p.begin(); i != req->query.p.end(); i++)
301                 {
302                         if (i->size() > maxparamlen)
303                                 maxparamlen = i->size();
304                 }
305
306                 /* How many params are there in the query? */
307                 paramcount = count(req->query.q.c_str(), '?');
308
309                 /* This stores copy of params to be inserted with using numbered params 1;3B*/
310                 ParamL paramscopy(req->query.p);
311
312                 /* To avoid a lot of allocations, allocate enough memory for the biggest the escaped query could possibly be.
313                  * sizeofquery + (maxtotalparamlength*2) + 1
314                  *
315                  * The +1 is for null-terminating the string
316                  */
317
318                 query = new char[req->query.q.length() + (maxparamlen*paramcount*2) + 1];
319                 queryend = query;
320
321                 for(unsigned long i = 0; i < req->query.q.length(); i++)
322                 {
323                         if(req->query.q[i] == '?')
324                         {
325                                 /* We found a place to substitute..what fun.
326                                  * use mssql calls to escape and write the
327                                  * escaped string onto the end of our query buffer,
328                                  * then we "just" need to make sure queryend is
329                                  * pointing at the right place.
330                                  */
331
332                                 /* Is it numbered parameter?
333                                  */
334
335                                 bool numbered;
336                                 numbered = false;
337
338                                 /* Numbered parameter number :|
339                                  */
340                                 unsigned int paramnum;
341                                 paramnum = 0;
342
343                                 /* Let's check if it's a numbered param. And also calculate it's number.
344                                  */
345
346                                 while ((i < req->query.q.length() - 1) && (req->query.q[i+1] >= '0') && (req->query.q[i+1] <= '9'))
347                                 {
348                                         numbered = true;
349                                         ++i;
350                                         paramnum = paramnum * 10 + req->query.q[i] - '0';
351                                 }
352
353                                 if (paramnum > paramscopy.size() - 1)
354                                 {
355                                         /* index is out of range!
356                                          */
357                                         numbered = false;
358                                 }
359
360                                 if (numbered)
361                                 {
362                                         /* Custom escaping for this one. converting ' to '' should make SQL Server happy. Ugly but fast :]
363                                          */
364                                         char* escaped = new char[(paramscopy[paramnum].length() * 2) + 1];
365                                         char* escend = escaped;
366                                         for (std::string::iterator p = paramscopy[paramnum].begin(); p < paramscopy[paramnum].end(); p++)
367                                         {
368                                                 if (*p == '\'')
369                                                 {
370                                                         *escend = *p;
371                                                         escend++;
372                                                         *escend = *p;
373                                                 }
374                                                 *escend = *p;
375                                                 escend++;
376                                         }
377                                         *escend = 0;
378
379                                         for (char* n = escaped; *n; n++)
380                                         {
381                                                 *queryend = *n;
382                                                 queryend++;
383                                         }
384                                         delete[] escaped;
385                                 }
386                                 else if (req->query.p.size())
387                                 {
388                                         /* Custom escaping for this one. converting ' to '' should make SQL Server happy. Ugly but fast :]
389                                          */
390                                         char* escaped = new char[(req->query.p.front().length() * 2) + 1];
391                                         char* escend = escaped;
392                                         for (std::string::iterator p = req->query.p.front().begin(); p < req->query.p.front().end(); p++)
393                                         {
394                                                 if (*p == '\'')
395                                                 {
396                                                         *escend = *p;
397                                                         escend++;
398                                                         *escend = *p;
399                                                 }
400                                                 *escend = *p;
401                                                 escend++;
402                                         }
403                                         *escend = 0;
404
405                                         for (char* n = escaped; *n; n++)
406                                         {
407                                                 *queryend = *n;
408                                                 queryend++;
409                                         }
410                                         delete[] escaped;
411                                         req->query.p.pop_front();
412                                 }
413                                 else
414                                         break;
415                         }
416                         else
417                         {
418                                 *queryend = req->query.q[i];
419                                 queryend++;
420                         }
421                 }
422                 *queryend = 0;
423                 req->query.q = query;
424
425                 MsSQLResult* res = new MsSQLResult((Module*)mod, req->source, req->id);
426                 res->dbid = host.id;
427                 res->query = req->query.q;
428
429                 char* msquery = strdup(req->query.q.data());
430                 LoggingMutex->Lock();
431                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "doing Query: %s",msquery);
432                 LoggingMutex->Unlock();
433                 if (tds_submit_query(sock, msquery) != TDS_SUCCEED)
434                 {
435                         std::string error("failed to execute: "+std::string(req->query.q.data()));
436                         delete[] query;
437                         delete res;
438                         free(msquery);
439                         return SQLerror(SQL_QSEND_FAIL, error);
440                 }
441                 delete[] query;
442                 free(msquery);
443
444                 int tds_res;
445                 while (tds_process_tokens(sock, &tds_res, NULL, TDS_TOKEN_RESULTS) == TDS_SUCCEED)
446                 {
447                         //ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "<******> result type: %d", tds_res);
448                         //ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "AFFECTED ROWS: %d", sock->rows_affected);
449                         switch (tds_res)
450                         {
451                                 case TDS_ROWFMT_RESULT:
452                                         break;
453
454                                 case TDS_DONE_RESULT:
455                                         if (sock->rows_affected > -1)
456                                         {
457                                                 for (int c = 0; c < sock->rows_affected; c++)  res->UpdateAffectedCount();
458                                                 continue;
459                                         }
460                                         break;
461
462                                 case TDS_ROW_RESULT:
463                                         while (tds_process_tokens(sock, &tds_res, NULL, TDS_STOPAT_ROWFMT|TDS_RETURN_DONE|TDS_RETURN_ROW) == TDS_SUCCEED)
464                                         {
465                                                 if (tds_res != TDS_ROW_RESULT)
466                                                         break;
467
468                                                 if (!sock->current_results)
469                                                         continue;
470
471                                                 if (sock->res_info->row_count > 0)
472                                                 {
473                                                         int cols = sock->res_info->num_cols;
474                                                         char** name = new char*[512];
475                                                         char** data = new char*[512];
476                                                         for (int j=0; j<cols; j++)
477                                                         {
478                                                                 TDSCOLUMN* col = sock->current_results->columns[j];
479                                                                 name[j] = col->column_name;
480
481                                                                 int ctype;
482                                                                 int srclen;
483                                                                 unsigned char* src;
484                                                                 CONV_RESULT dres;
485                                                                 ctype = tds_get_conversion_type(col->column_type, col->column_size);
486 #if _TDSVER >= 82
487                                                                         src = col->column_data;
488 #else
489                                                                         src = &(sock->current_results->current_row[col->column_offset]);
490 #endif
491                                                                 srclen = col->column_cur_size;
492                                                                 tds_convert(sock->tds_ctx, ctype, (TDS_CHAR *) src, srclen, SYBCHAR, &dres);
493                                                                 data[j] = (char*)dres.ib;
494                                                         }
495                                                         ResultReady(res, cols, data, name);
496                                                 }
497                                         }
498                                         break;
499
500                                 default:
501                                         break;
502                         }
503                 }
504                 ResultsMutex->Lock();
505                 results.push_back(res);
506                 ResultsMutex->Unlock();
507                 return SQLerror();
508         }
509
510         static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
511         {
512                 SQLConn* sc = (SQLConn*)pContext->parent;
513                 LoggingMutex->Lock();
514                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
515                 LoggingMutex->Unlock();
516                 return 0;
517         }
518
519         static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
520         {
521                 SQLConn* sc = (SQLConn*)pContext->parent;
522                 LoggingMutex->Lock();
523                 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
524                 LoggingMutex->Unlock();
525                 return 0;
526         }
527
528         void ResultReady(MsSQLResult *res, int cols, char **data, char **colnames)
529         {
530                 res->AddRow(cols, data, colnames);
531         }
532
533         void AffectedReady(MsSQLResult *res)
534         {
535                 res->UpdateAffectedCount();
536         }
537
538         bool OpenDB()
539         {
540                 CloseDB();
541
542                 TDSCONNECTION* conn = NULL;
543
544                 login = tds_alloc_login();
545                 tds_set_app(login, "TSQL");
546                 tds_set_library(login,"TDS-Library");
547                 tds_set_host(login, "");
548                 tds_set_server(login, host.host.c_str());
549                 tds_set_server_addr(login, host.host.c_str());
550                 tds_set_user(login, host.user.c_str());
551                 tds_set_passwd(login, host.pass.c_str());
552                 tds_set_port(login, host.port);
553                 tds_set_packet(login, 512);
554
555                 context = tds_alloc_context(this);
556                 context->msg_handler = HandleMessage;
557                 context->err_handler = HandleError;
558
559                 sock = tds_alloc_socket(context, 512);
560                 tds_set_parent(sock, NULL);
561
562                 conn = tds_read_config_info(NULL, login, context->locale);
563
564                 if (tds_connect(sock, conn) == TDS_SUCCEED)
565                 {
566                         tds_free_connection(conn);
567                         return 1;
568                 }
569                 tds_free_connection(conn);
570                 return 0;
571         }
572
573         void CloseDB()
574         {
575                 if (sock)
576                 {
577                         tds_free_socket(sock);
578                         sock = NULL;
579                 }
580                 if (context)
581                 {
582                         tds_free_context(context);
583                         context = NULL;
584                 }
585                 if (login)
586                 {
587                         tds_free_login(login);
588                         login = NULL;
589                 }
590         }
591
592         SQLhost GetConfHost()
593         {
594                 return host;
595         }
596
597         void SendResults()
598         {
599                 while (results.size())
600                 {
601                         MsSQLResult* res = results[0];
602                         ResultsMutex->Lock();
603                         if (res->dest)
604                         {
605                                 res->Send();
606                         }
607                         else
608                         {
609                                 /* If the client module is unloaded partway through a query then the provider will set
610                                  * the pointer to NULL. We cannot just cancel the query as the result will still come
611                                  * through at some point...and it could get messy if we play with invalid pointers...
612                                  */
613                                 delete res;
614                         }
615                         results.pop_front();
616                         ResultsMutex->Unlock();
617                 }
618         }
619
620         void ClearResults()
621         {
622                 while (results.size())
623                 {
624                         MsSQLResult* res = results[0];
625                         delete res;
626                         results.pop_front();
627                 }
628         }
629
630         void DoLeadingQuery()
631         {
632                 SQLrequest* req = queue.front();
633                 req->error = Query(req);
634         }
635
636 };
637
638
639 class ModuleMsSQL : public Module
640 {
641  private:
642         unsigned long currid;
643         QueryThread* queryDispatcher;
644         ServiceProvider sqlserv;
645
646  public:
647         ModuleMsSQL()
648         : currid(0), sqlserv(this, "SQL/mssql", SERVICE_DATA)
649         {
650                 LoggingMutex = new Mutex();
651                 ResultsMutex = new Mutex();
652                 queryDispatcher = new QueryThread(this);
653         }
654
655         void init() CXX11_OVERRIDE
656         {
657                 ReadConf();
658
659                 ServerInstance->Threads->Start(queryDispatcher);
660
661                 Implementation eventlist[] = { I_OnRehash };
662                 ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation));
663                 ServerInstance->Modules->AddService(sqlserv);
664         }
665
666         ~ModuleMsSQL()
667         {
668                 queryDispatcher->join();
669                 delete queryDispatcher;
670                 ClearQueue();
671                 ClearAllConnections();
672
673                 delete LoggingMutex;
674                 delete ResultsMutex;
675         }
676
677         void SendQueue()
678         {
679                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
680                 {
681                         iter->second->SendResults();
682                 }
683         }
684
685         void ClearQueue()
686         {
687                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
688                 {
689                         iter->second->ClearResults();
690                 }
691         }
692
693         bool HasHost(const SQLhost &host)
694         {
695                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
696                 {
697                         if (host == iter->second->GetConfHost())
698                                 return true;
699                 }
700                 return false;
701         }
702
703         bool HostInConf(const SQLhost &h)
704         {
705                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
706                 for (ConfigIter i = tags.first; i != tags.second; ++i)
707                 {
708                         ConfigTag* tag = i->second;
709                         SQLhost host;
710                         host.id         = tag->getString("id");
711                         host.host       = tag->getString("hostname");
712                         host.port       = tag->getInt("port", 1433);
713                         host.name       = tag->getString("name");
714                         host.user       = tag->getString("username");
715                         host.pass       = tag->getString("password");
716                         if (h == host)
717                                 return true;
718                 }
719                 return false;
720         }
721
722         void ReadConf()
723         {
724                 ClearOldConnections();
725
726                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
727                 for (ConfigIter i = tags.first; i != tags.second; ++i)
728                 {
729                         ConfigTag* tag = i->second;
730                         SQLhost host;
731
732                         host.id         = tag->getString("id");
733                         host.host       = tag->getString("hostname");
734                         host.port       = tag->getInt("port", 1433);
735                         host.name       = tag->getString("name");
736                         host.user       = tag->getString("username");
737                         host.pass       = tag->getString("password");
738
739                         if (HasHost(host))
740                                 continue;
741
742                         this->AddConn(host);
743                 }
744         }
745
746         void AddConn(const SQLhost& hi)
747         {
748                 if (HasHost(hi))
749                 {
750                         LoggingMutex->Lock();
751                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str());
752                         LoggingMutex->Unlock();
753                         return;
754                 }
755
756                 SQLConn* newconn;
757
758                 newconn = new SQLConn(this, hi);
759
760                 connections.insert(std::make_pair(hi.id, newconn));
761         }
762
763         void ClearOldConnections()
764         {
765                 ConnMap::iterator iter,safei;
766                 for (iter = connections.begin(); iter != connections.end(); iter++)
767                 {
768                         if (!HostInConf(iter->second->GetConfHost()))
769                         {
770                                 delete iter->second;
771                                 safei = iter;
772                                 --iter;
773                                 connections.erase(safei);
774                         }
775                 }
776         }
777
778         void ClearAllConnections()
779         {
780                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
781                         delete i->second;
782                 connections.clear();
783         }
784
785         void OnRehash(User* user) CXX11_OVERRIDE
786         {
787                 queryDispatcher->LockQueue();
788                 ReadConf();
789                 queryDispatcher->UnlockQueueWakeup();
790         }
791
792         void OnRequest(Request& request) CXX11_OVERRIDE
793         {
794                 if(strcmp(SQLREQID, request.id) == 0)
795                 {
796                         SQLrequest* req = (SQLrequest*)&request;
797
798                         queryDispatcher->LockQueue();
799
800                         ConnMap::iterator iter;
801
802                         if((iter = connections.find(req->dbid)) != connections.end())
803                         {
804                                 req->id = NewID();
805                                 iter->second->queue.push(new SQLrequest(*req));
806                         }
807                         else
808                         {
809                                 req->error.Id(SQL_BAD_DBID);
810                         }
811                         queryDispatcher->UnlockQueueWakeup();
812                 }
813         }
814
815         unsigned long NewID()
816         {
817                 if (currid+1 == 0)
818                         currid++;
819
820                 return ++currid;
821         }
822
823         Version GetVersion() CXX11_OVERRIDE
824         {
825                 return Version("MsSQL provider", VF_VENDOR);
826         }
827
828 };
829
830 void QueryThread::OnNotify()
831 {
832         Parent->SendQueue();
833 }
834
835 void QueryThread::Run()
836 {
837         this->LockQueue();
838         while (this->GetExitFlag() == false)
839         {
840                 SQLConn* conn = NULL;
841                 for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
842                 {
843                         if (i->second->queue.totalsize())
844                         {
845                                 conn = i->second;
846                                 break;
847                         }
848                 }
849                 if (conn)
850                 {
851                         this->UnlockQueue();
852                         conn->DoLeadingQuery();
853                         this->NotifyParent();
854                         this->LockQueue();
855                         conn->queue.pop();
856                 }
857                 else
858                 {
859                         this->WaitForQueue();
860                 }
861         }
862         this->UnlockQueue();
863 }
864
865 MODULE_INIT(ModuleMsSQL)