]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_mssql.cpp
m_spanningtree Remove duplicate code for sending channel messages from RouteCommand()
[user/henk/code/inspircd.git] / src / modules / extra / m_mssql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2008-2009 Dennis Friis <peavey@inspircd.org>
5  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
6  *   Copyright (C) 2008-2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Pippijn van Steenhoven <pip88nl@gmail.com>
9  *
10  * This file is part of InspIRCd.  InspIRCd is free software: you can
11  * redistribute it and/or modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation, version 2.
13  *
14  * This program is distributed in the hope that it will be useful, but WITHOUT
15  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
17  * details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  */
22
23
24 #include "inspircd.h"
25 #include <tds.h>
26 #include <tdsconvert.h>
27 #include "users.h"
28 #include "channels.h"
29 #include "modules.h"
30
31 #include "m_sqlv2.h"
32
33 /* $CompileFlags: exec("grep VERSION_NO /usr/include/tdsver.h 2>/dev/null | perl -e 'print "-D_TDSVER=".((<> =~ /freetds v(\d+\.\d+)/i) ? $1*100 : 0);'") */
34 /* $LinkerFlags: -ltds */
35
36 class SQLConn;
37 class MsSQLResult;
38 class ModuleMsSQL;
39
40 typedef std::map<std::string, SQLConn*> ConnMap;
41 typedef std::deque<MsSQLResult*> ResultQueue;
42
43 unsigned long count(const char * const str, char a)
44 {
45         unsigned long n = 0;
46         for (const char *p = str; *p; ++p)
47         {
48                 if (*p == '?')
49                         ++n;
50         }
51         return n;
52 }
53
54 ConnMap connections;
55 Mutex* ResultsMutex;
56 Mutex* LoggingMutex;
57
58 class QueryThread : public SocketThread
59 {
60   private:
61         ModuleMsSQL* const Parent;
62   public:
63         QueryThread(ModuleMsSQL* mod) : Parent(mod) { }
64         ~QueryThread() { }
65         void Run();
66         void OnNotify();
67 };
68
69 class MsSQLResult : public SQLresult
70 {
71  private:
72         int currentrow;
73         int rows;
74         int cols;
75
76         std::vector<std::string> colnames;
77         std::vector<SQLfieldList> fieldlists;
78         SQLfieldList emptyfieldlist;
79
80         SQLfieldList* fieldlist;
81         SQLfieldMap* fieldmap;
82
83  public:
84         MsSQLResult(Module* self, Module* to, unsigned int rid)
85         : SQLresult(self, to, rid), currentrow(0), rows(0), cols(0), fieldlist(NULL), fieldmap(NULL)
86         {
87         }
88
89         void AddRow(int colsnum, char **dat, char **colname)
90         {
91                 colnames.clear();
92                 cols = colsnum;
93                 for (int i = 0; i < colsnum; i++)
94                 {
95                         fieldlists.resize(fieldlists.size()+1);
96                         colnames.push_back(colname[i]);
97                         SQLfield sf(dat[i] ? dat[i] : "", dat[i] ? false : true);
98                         fieldlists[rows].push_back(sf);
99                 }
100                 rows++;
101         }
102
103         void UpdateAffectedCount()
104         {
105                 rows++;
106         }
107
108         int Rows()
109         {
110                 return rows;
111         }
112
113         int Cols()
114         {
115                 return cols;
116         }
117
118         std::string ColName(int column)
119         {
120                 if (column < (int)colnames.size())
121                 {
122                         return colnames[column];
123                 }
124                 else
125                 {
126                         throw SQLbadColName();
127                 }
128                 return "";
129         }
130
131         int ColNum(const std::string &column)
132         {
133                 for (unsigned int i = 0; i < colnames.size(); i++)
134                 {
135                         if (column == colnames[i])
136                                 return i;
137                 }
138                 throw SQLbadColName();
139                 return 0;
140         }
141
142         SQLfield GetValue(int row, int column)
143         {
144                 if ((row >= 0) && (row < rows) && (column >= 0) && (column < Cols()))
145                 {
146                         return fieldlists[row][column];
147                 }
148
149                 throw SQLbadColName();
150
151                 /* XXX: We never actually get here because of the throw */
152                 return SQLfield("",true);
153         }
154
155         SQLfieldList& GetRow()
156         {
157                 if (currentrow < rows)
158                         return fieldlists[currentrow];
159                 else
160                         return emptyfieldlist;
161         }
162
163         SQLfieldMap& GetRowMap()
164         {
165                 /* In an effort to reduce overhead we don't actually allocate the map
166                  * until the first time it's needed...so...
167                  */
168                 if(fieldmap)
169                 {
170                         fieldmap->clear();
171                 }
172                 else
173                 {
174                         fieldmap = new SQLfieldMap;
175                 }
176
177                 if (currentrow < rows)
178                 {
179                         for (int i = 0; i < Cols(); i++)
180                         {
181                                 fieldmap->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
182                         }
183                         currentrow++;
184                 }
185
186                 return *fieldmap;
187         }
188
189         SQLfieldList* GetRowPtr()
190         {
191                 fieldlist = new SQLfieldList();
192
193                 if (currentrow < rows)
194                 {
195                         for (int i = 0; i < Rows(); i++)
196                         {
197                                 fieldlist->push_back(fieldlists[currentrow][i]);
198                         }
199                         currentrow++;
200                 }
201                 return fieldlist;
202         }
203
204         SQLfieldMap* GetRowMapPtr()
205         {
206                 fieldmap = new SQLfieldMap();
207
208                 if (currentrow < rows)
209                 {
210                         for (int i = 0; i < Cols(); i++)
211                         {
212                                 fieldmap->insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
213                         }
214                         currentrow++;
215                 }
216
217                 return fieldmap;
218         }
219
220         void Free(SQLfieldMap* fm)
221         {
222                 delete fm;
223         }
224
225         void Free(SQLfieldList* fl)
226         {
227                 delete fl;
228         }
229 };
230
231 class SQLConn : public classbase
232 {
233  private:
234         ResultQueue results;
235         Module* mod;
236         SQLhost host;
237         TDSLOGIN* login;
238         TDSSOCKET* sock;
239         TDSCONTEXT* context;
240
241  public:
242         QueryQueue queue;
243
244         SQLConn(Module* m, const SQLhost& hi)
245         : mod(m), host(hi), login(NULL), sock(NULL), context(NULL)
246         {
247                 if (OpenDB())
248                 {
249                         std::string query("USE " + host.name);
250                         if (tds_submit_query(sock, query.c_str()) == TDS_SUCCEED)
251                         {
252                                 if (tds_process_simple_query(sock) != TDS_SUCCEED)
253                                 {
254                                         LoggingMutex->Lock();
255                                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
256                                         LoggingMutex->Unlock();
257                                         CloseDB();
258                                 }
259                         }
260                         else
261                         {
262                                 LoggingMutex->Lock();
263                                 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
264                                 LoggingMutex->Unlock();
265                                 CloseDB();
266                         }
267                 }
268                 else
269                 {
270                         LoggingMutex->Lock();
271                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to DB with id: " + host.id);
272                         LoggingMutex->Unlock();
273                         CloseDB();
274                 }
275         }
276
277         ~SQLConn()
278         {
279                 CloseDB();
280         }
281
282         SQLerror Query(SQLrequest* req)
283         {
284                 if (!sock)
285                         return SQLerror(SQL_BAD_CONN, "Socket was NULL, check if SQL server is running.");
286
287                 /* Pointer to the buffer we screw around with substitution in */
288                 char* query;
289
290                 /* Pointer to the current end of query, where we append new stuff */
291                 char* queryend;
292
293                 /* Total length of the unescaped parameters */
294                 unsigned long maxparamlen, paramcount;
295
296                 /* The length of the longest parameter */
297                 maxparamlen = 0;
298
299                 for(ParamL::iterator i = req->query.p.begin(); i != req->query.p.end(); i++)
300                 {
301                         if (i->size() > maxparamlen)
302                                 maxparamlen = i->size();
303                 }
304
305                 /* How many params are there in the query? */
306                 paramcount = count(req->query.q.c_str(), '?');
307
308                 /* This stores copy of params to be inserted with using numbered params 1;3B*/
309                 ParamL paramscopy(req->query.p);
310
311                 /* To avoid a lot of allocations, allocate enough memory for the biggest the escaped query could possibly be.
312                  * sizeofquery + (maxtotalparamlength*2) + 1
313                  *
314                  * The +1 is for null-terminating the string
315                  */
316
317                 query = new char[req->query.q.length() + (maxparamlen*paramcount*2) + 1];
318                 queryend = query;
319
320                 for(unsigned long i = 0; i < req->query.q.length(); i++)
321                 {
322                         if(req->query.q[i] == '?')
323                         {
324                                 /* We found a place to substitute..what fun.
325                                  * use mssql calls to escape and write the
326                                  * escaped string onto the end of our query buffer,
327                                  * then we "just" need to make sure queryend is
328                                  * pointing at the right place.
329                                  */
330
331                                 /* Is it numbered parameter?
332                                  */
333
334                                 bool numbered;
335                                 numbered = false;
336
337                                 /* Numbered parameter number :|
338                                  */
339                                 unsigned int paramnum;
340                                 paramnum = 0;
341
342                                 /* Let's check if it's a numbered param. And also calculate it's number.
343                                  */
344
345                                 while ((i < req->query.q.length() - 1) && (req->query.q[i+1] >= '0') && (req->query.q[i+1] <= '9'))
346                                 {
347                                         numbered = true;
348                                         ++i;
349                                         paramnum = paramnum * 10 + req->query.q[i] - '0';
350                                 }
351
352                                 if (paramnum > paramscopy.size() - 1)
353                                 {
354                                         /* index is out of range!
355                                          */
356                                         numbered = false;
357                                 }
358
359                                 if (numbered)
360                                 {
361                                         /* Custom escaping for this one. converting ' to '' should make SQL Server happy. Ugly but fast :]
362                                          */
363                                         char* escaped = new char[(paramscopy[paramnum].length() * 2) + 1];
364                                         char* escend = escaped;
365                                         for (std::string::iterator p = paramscopy[paramnum].begin(); p < paramscopy[paramnum].end(); p++)
366                                         {
367                                                 if (*p == '\'')
368                                                 {
369                                                         *escend = *p;
370                                                         escend++;
371                                                         *escend = *p;
372                                                 }
373                                                 *escend = *p;
374                                                 escend++;
375                                         }
376                                         *escend = 0;
377
378                                         for (char* n = escaped; *n; n++)
379                                         {
380                                                 *queryend = *n;
381                                                 queryend++;
382                                         }
383                                         delete[] escaped;
384                                 }
385                                 else if (req->query.p.size())
386                                 {
387                                         /* Custom escaping for this one. converting ' to '' should make SQL Server happy. Ugly but fast :]
388                                          */
389                                         char* escaped = new char[(req->query.p.front().length() * 2) + 1];
390                                         char* escend = escaped;
391                                         for (std::string::iterator p = req->query.p.front().begin(); p < req->query.p.front().end(); p++)
392                                         {
393                                                 if (*p == '\'')
394                                                 {
395                                                         *escend = *p;
396                                                         escend++;
397                                                         *escend = *p;
398                                                 }
399                                                 *escend = *p;
400                                                 escend++;
401                                         }
402                                         *escend = 0;
403
404                                         for (char* n = escaped; *n; n++)
405                                         {
406                                                 *queryend = *n;
407                                                 queryend++;
408                                         }
409                                         delete[] escaped;
410                                         req->query.p.pop_front();
411                                 }
412                                 else
413                                         break;
414                         }
415                         else
416                         {
417                                 *queryend = req->query.q[i];
418                                 queryend++;
419                         }
420                 }
421                 *queryend = 0;
422                 req->query.q = query;
423
424                 MsSQLResult* res = new MsSQLResult((Module*)mod, req->source, req->id);
425                 res->dbid = host.id;
426                 res->query = req->query.q;
427
428                 char* msquery = strdup(req->query.q.data());
429                 LoggingMutex->Lock();
430                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "doing Query: %s",msquery);
431                 LoggingMutex->Unlock();
432                 if (tds_submit_query(sock, msquery) != TDS_SUCCEED)
433                 {
434                         std::string error("failed to execute: "+std::string(req->query.q.data()));
435                         delete[] query;
436                         delete res;
437                         free(msquery);
438                         return SQLerror(SQL_QSEND_FAIL, error);
439                 }
440                 delete[] query;
441                 free(msquery);
442
443                 int tds_res;
444                 while (tds_process_tokens(sock, &tds_res, NULL, TDS_TOKEN_RESULTS) == TDS_SUCCEED)
445                 {
446                         //ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "<******> result type: %d", tds_res);
447                         //ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "AFFECTED ROWS: %d", sock->rows_affected);
448                         switch (tds_res)
449                         {
450                                 case TDS_ROWFMT_RESULT:
451                                         break;
452
453                                 case TDS_DONE_RESULT:
454                                         if (sock->rows_affected > -1)
455                                         {
456                                                 for (int c = 0; c < sock->rows_affected; c++)  res->UpdateAffectedCount();
457                                                 continue;
458                                         }
459                                         break;
460
461                                 case TDS_ROW_RESULT:
462                                         while (tds_process_tokens(sock, &tds_res, NULL, TDS_STOPAT_ROWFMT|TDS_RETURN_DONE|TDS_RETURN_ROW) == TDS_SUCCEED)
463                                         {
464                                                 if (tds_res != TDS_ROW_RESULT)
465                                                         break;
466
467                                                 if (!sock->current_results)
468                                                         continue;
469
470                                                 if (sock->res_info->row_count > 0)
471                                                 {
472                                                         int cols = sock->res_info->num_cols;
473                                                         char** name = new char*[512];
474                                                         char** data = new char*[512];
475                                                         for (int j=0; j<cols; j++)
476                                                         {
477                                                                 TDSCOLUMN* col = sock->current_results->columns[j];
478                                                                 name[j] = col->column_name;
479
480                                                                 int ctype;
481                                                                 int srclen;
482                                                                 unsigned char* src;
483                                                                 CONV_RESULT dres;
484                                                                 ctype = tds_get_conversion_type(col->column_type, col->column_size);
485 #if _TDSVER >= 82
486                                                                         src = col->column_data;
487 #else
488                                                                         src = &(sock->current_results->current_row[col->column_offset]);
489 #endif
490                                                                 srclen = col->column_cur_size;
491                                                                 tds_convert(sock->tds_ctx, ctype, (TDS_CHAR *) src, srclen, SYBCHAR, &dres);
492                                                                 data[j] = (char*)dres.ib;
493                                                         }
494                                                         ResultReady(res, cols, data, name);
495                                                 }
496                                         }
497                                         break;
498
499                                 default:
500                                         break;
501                         }
502                 }
503                 ResultsMutex->Lock();
504                 results.push_back(res);
505                 ResultsMutex->Unlock();
506                 return SQLerror();
507         }
508
509         static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
510         {
511                 SQLConn* sc = (SQLConn*)pContext->parent;
512                 LoggingMutex->Lock();
513                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
514                 LoggingMutex->Unlock();
515                 return 0;
516         }
517
518         static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
519         {
520                 SQLConn* sc = (SQLConn*)pContext->parent;
521                 LoggingMutex->Lock();
522                 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
523                 LoggingMutex->Unlock();
524                 return 0;
525         }
526
527         void ResultReady(MsSQLResult *res, int cols, char **data, char **colnames)
528         {
529                 res->AddRow(cols, data, colnames);
530         }
531
532         void AffectedReady(MsSQLResult *res)
533         {
534                 res->UpdateAffectedCount();
535         }
536
537         bool OpenDB()
538         {
539                 CloseDB();
540
541                 TDSCONNECTION* conn = NULL;
542
543                 login = tds_alloc_login();
544                 tds_set_app(login, "TSQL");
545                 tds_set_library(login,"TDS-Library");
546                 tds_set_host(login, "");
547                 tds_set_server(login, host.host.c_str());
548                 tds_set_server_addr(login, host.host.c_str());
549                 tds_set_user(login, host.user.c_str());
550                 tds_set_passwd(login, host.pass.c_str());
551                 tds_set_port(login, host.port);
552                 tds_set_packet(login, 512);
553
554                 context = tds_alloc_context(this);
555                 context->msg_handler = HandleMessage;
556                 context->err_handler = HandleError;
557
558                 sock = tds_alloc_socket(context, 512);
559                 tds_set_parent(sock, NULL);
560
561                 conn = tds_read_config_info(NULL, login, context->locale);
562
563                 if (tds_connect(sock, conn) == TDS_SUCCEED)
564                 {
565                         tds_free_connection(conn);
566                         return 1;
567                 }
568                 tds_free_connection(conn);
569                 return 0;
570         }
571
572         void CloseDB()
573         {
574                 if (sock)
575                 {
576                         tds_free_socket(sock);
577                         sock = NULL;
578                 }
579                 if (context)
580                 {
581                         tds_free_context(context);
582                         context = NULL;
583                 }
584                 if (login)
585                 {
586                         tds_free_login(login);
587                         login = NULL;
588                 }
589         }
590
591         SQLhost GetConfHost()
592         {
593                 return host;
594         }
595
596         void SendResults()
597         {
598                 while (results.size())
599                 {
600                         MsSQLResult* res = results[0];
601                         ResultsMutex->Lock();
602                         if (res->dest)
603                         {
604                                 res->Send();
605                         }
606                         else
607                         {
608                                 /* If the client module is unloaded partway through a query then the provider will set
609                                  * the pointer to NULL. We cannot just cancel the query as the result will still come
610                                  * through at some point...and it could get messy if we play with invalid pointers...
611                                  */
612                                 delete res;
613                         }
614                         results.pop_front();
615                         ResultsMutex->Unlock();
616                 }
617         }
618
619         void ClearResults()
620         {
621                 while (results.size())
622                 {
623                         MsSQLResult* res = results[0];
624                         delete res;
625                         results.pop_front();
626                 }
627         }
628
629         void DoLeadingQuery()
630         {
631                 SQLrequest* req = queue.front();
632                 req->error = Query(req);
633         }
634
635 };
636
637
638 class ModuleMsSQL : public Module
639 {
640  private:
641         unsigned long currid;
642         QueryThread* queryDispatcher;
643         ServiceProvider sqlserv;
644
645  public:
646         ModuleMsSQL()
647         : currid(0), sqlserv(this, "SQL/mssql", SERVICE_DATA)
648         {
649                 LoggingMutex = new Mutex();
650                 ResultsMutex = new Mutex();
651                 queryDispatcher = new QueryThread(this);
652         }
653
654         void init() CXX11_OVERRIDE
655         {
656                 ReadConf();
657
658                 ServerInstance->Threads->Start(queryDispatcher);
659
660                 ServerInstance->Modules->AddService(sqlserv);
661         }
662
663         ~ModuleMsSQL()
664         {
665                 queryDispatcher->join();
666                 delete queryDispatcher;
667                 ClearQueue();
668                 ClearAllConnections();
669
670                 delete LoggingMutex;
671                 delete ResultsMutex;
672         }
673
674         void SendQueue()
675         {
676                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
677                 {
678                         iter->second->SendResults();
679                 }
680         }
681
682         void ClearQueue()
683         {
684                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
685                 {
686                         iter->second->ClearResults();
687                 }
688         }
689
690         bool HasHost(const SQLhost &host)
691         {
692                 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
693                 {
694                         if (host == iter->second->GetConfHost())
695                                 return true;
696                 }
697                 return false;
698         }
699
700         bool HostInConf(const SQLhost &h)
701         {
702                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
703                 for (ConfigIter i = tags.first; i != tags.second; ++i)
704                 {
705                         ConfigTag* tag = i->second;
706                         SQLhost host;
707                         host.id         = tag->getString("id");
708                         host.host       = tag->getString("hostname");
709                         host.port       = tag->getInt("port", 1433);
710                         host.name       = tag->getString("name");
711                         host.user       = tag->getString("username");
712                         host.pass       = tag->getString("password");
713                         if (h == host)
714                                 return true;
715                 }
716                 return false;
717         }
718
719         void ReadConf()
720         {
721                 ClearOldConnections();
722
723                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
724                 for (ConfigIter i = tags.first; i != tags.second; ++i)
725                 {
726                         ConfigTag* tag = i->second;
727                         SQLhost host;
728
729                         host.id         = tag->getString("id");
730                         host.host       = tag->getString("hostname");
731                         host.port       = tag->getInt("port", 1433);
732                         host.name       = tag->getString("name");
733                         host.user       = tag->getString("username");
734                         host.pass       = tag->getString("password");
735
736                         if (HasHost(host))
737                                 continue;
738
739                         this->AddConn(host);
740                 }
741         }
742
743         void AddConn(const SQLhost& hi)
744         {
745                 if (HasHost(hi))
746                 {
747                         LoggingMutex->Lock();
748                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str());
749                         LoggingMutex->Unlock();
750                         return;
751                 }
752
753                 SQLConn* newconn;
754
755                 newconn = new SQLConn(this, hi);
756
757                 connections.insert(std::make_pair(hi.id, newconn));
758         }
759
760         void ClearOldConnections()
761         {
762                 ConnMap::iterator iter,safei;
763                 for (iter = connections.begin(); iter != connections.end(); iter++)
764                 {
765                         if (!HostInConf(iter->second->GetConfHost()))
766                         {
767                                 delete iter->second;
768                                 safei = iter;
769                                 --iter;
770                                 connections.erase(safei);
771                         }
772                 }
773         }
774
775         void ClearAllConnections()
776         {
777                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); ++i)
778                         delete i->second;
779                 connections.clear();
780         }
781
782         void OnRehash(User* user) CXX11_OVERRIDE
783         {
784                 queryDispatcher->LockQueue();
785                 ReadConf();
786                 queryDispatcher->UnlockQueueWakeup();
787         }
788
789         void OnRequest(Request& request) CXX11_OVERRIDE
790         {
791                 if(strcmp(SQLREQID, request.id) == 0)
792                 {
793                         SQLrequest* req = (SQLrequest*)&request;
794
795                         queryDispatcher->LockQueue();
796
797                         ConnMap::iterator iter;
798
799                         if((iter = connections.find(req->dbid)) != connections.end())
800                         {
801                                 req->id = NewID();
802                                 iter->second->queue.push(new SQLrequest(*req));
803                         }
804                         else
805                         {
806                                 req->error.Id(SQL_BAD_DBID);
807                         }
808                         queryDispatcher->UnlockQueueWakeup();
809                 }
810         }
811
812         unsigned long NewID()
813         {
814                 if (currid+1 == 0)
815                         currid++;
816
817                 return ++currid;
818         }
819
820         Version GetVersion() CXX11_OVERRIDE
821         {
822                 return Version("MsSQL provider", VF_VENDOR);
823         }
824
825 };
826
827 void QueryThread::OnNotify()
828 {
829         Parent->SendQueue();
830 }
831
832 void QueryThread::Run()
833 {
834         this->LockQueue();
835         while (this->GetExitFlag() == false)
836         {
837                 SQLConn* conn = NULL;
838                 for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
839                 {
840                         if (i->second->queue.totalsize())
841                         {
842                                 conn = i->second;
843                                 break;
844                         }
845                 }
846                 if (conn)
847                 {
848                         this->UnlockQueue();
849                         conn->DoLeadingQuery();
850                         this->NotifyParent();
851                         this->LockQueue();
852                         conn->queue.pop();
853                 }
854                 else
855                 {
856                         this->WaitForQueue();
857                 }
858         }
859         this->UnlockQueue();
860 }
861
862 MODULE_INIT(ModuleMsSQL)