]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Add GetId() to the SQL::Provider class.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
26
27 /// $PackageInfo: require_system("arch") postgresql-libs
28 /// $PackageInfo: require_system("centos") postgresql-devel
29 /// $PackageInfo: require_system("darwin") postgresql
30 /// $PackageInfo: require_system("debian") libpq-dev
31 /// $PackageInfo: require_system("ubuntu") libpq-dev
32
33
34 #include "inspircd.h"
35 #include <cstdlib>
36 #include <libpq-fe.h>
37 #include "modules/sql.h"
38
39 /* SQLConn rewritten by peavey to
40  * use EventHandler instead of
41  * BufferedSocket. This is much neater
42  * and gives total control of destroy
43  * and delete of resources.
44  */
45
46 /* Forward declare, so we can have the typedef neatly at the top */
47 class SQLConn;
48 class ModulePgSQL;
49
50 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
51
52 /* CREAD,       Connecting and wants read event
53  * CWRITE,      Connecting and wants write event
54  * WREAD,       Connected/Working and wants read event
55  * WWRITE,      Connected/Working and wants write event
56  * RREAD,       Resetting and wants read event
57  * RWRITE,      Resetting and wants write event
58  */
59 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
60
61 class ReconnectTimer : public Timer
62 {
63  private:
64         ModulePgSQL* mod;
65  public:
66         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
67         {
68         }
69         bool Tick(time_t TIME) CXX11_OVERRIDE;
70 };
71
72 struct QueueItem
73 {
74         SQL::Query* c;
75         std::string q;
76         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
77 };
78
79 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
80  * All SQL providers must create their own subclass and define it's methods using that
81  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
82  * of converting all data to a common format before it reaches the result structure. This way
83  * data is passes to the module nearly as directly as if it was using the API directly itself.
84  */
85
86 class PgSQLresult : public SQL::Result
87 {
88         PGresult* res;
89         int currentrow;
90         int rows;
91         std::vector<std::string> colnames;
92
93         void getColNames()
94         {
95                 colnames.resize(PQnfields(res));
96                 for(unsigned int i=0; i < colnames.size(); i++)
97                 {
98                         colnames[i] = PQfname(res, i);
99                 }
100         }
101  public:
102         PgSQLresult(PGresult* result) : res(result), currentrow(0)
103         {
104                 rows = PQntuples(res);
105                 if (!rows)
106                         rows = ConvToNum<int>(PQcmdTuples(res));
107         }
108
109         ~PgSQLresult()
110         {
111                 PQclear(res);
112         }
113
114         int Rows() CXX11_OVERRIDE
115         {
116                 return rows;
117         }
118
119         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
120         {
121                 if (colnames.empty())
122                         getColNames();
123                 result = colnames;
124         }
125
126         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
127         {
128                 if (colnames.empty())
129                         getColNames();
130
131                 for (size_t i = 0; i < colnames.size(); ++i)
132                 {
133                         if (colnames[i] == column)
134                         {
135                                 index = i;
136                                 return true;
137                         }
138                 }
139                 return false;
140         }
141
142         SQL::Field GetValue(int row, int column)
143         {
144                 char* v = PQgetvalue(res, row, column);
145                 if (!v || PQgetisnull(res, row, column))
146                         return SQL::Field();
147
148                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
149         }
150
151         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
152         {
153                 if (currentrow >= PQntuples(res))
154                         return false;
155                 int ncols = PQnfields(res);
156
157                 for(int i = 0; i < ncols; i++)
158                 {
159                         result.push_back(GetValue(currentrow, i));
160                 }
161                 currentrow++;
162
163                 return true;
164         }
165 };
166
167 /** SQLConn represents one SQL session.
168  */
169 class SQLConn : public SQL::Provider, public EventHandler
170 {
171  public:
172         reference<ConfigTag> conf;      /* The <database> entry */
173         std::deque<QueueItem> queue;
174         PGconn*                 sql;            /* PgSQL database connection handle */
175         SQLstatus               status;         /* PgSQL database connection status */
176         QueueItem               qinprog;        /* If there is currently a query in progress */
177
178         SQLConn(Module* Creator, ConfigTag* tag)
179                 : SQL::Provider(Creator, tag->getString("id"))
180                 , conf(tag)
181                 , sql(NULL)
182                 , status(CWRITE)
183                 , qinprog(NULL, "")
184         {
185                 if (!DoConnect())
186                 {
187                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
188                         DelayReconnect();
189                 }
190         }
191
192         CullResult cull() CXX11_OVERRIDE
193         {
194                 this->SQL::Provider::cull();
195                 ServerInstance->Modules->DelService(*this);
196                 return this->EventHandler::cull();
197         }
198
199         ~SQLConn()
200         {
201                 SQL::Error err(SQL::BAD_DBID);
202                 if (qinprog.c)
203                 {
204                         qinprog.c->OnError(err);
205                         delete qinprog.c;
206                 }
207                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
208                 {
209                         SQL::Query* q = i->c;
210                         q->OnError(err);
211                         delete q;
212                 }
213         }
214
215         void OnEventHandlerRead() CXX11_OVERRIDE
216         {
217                 DoEvent();
218         }
219
220         void OnEventHandlerWrite() CXX11_OVERRIDE
221         {
222                 DoEvent();
223         }
224
225         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
226         {
227                 DelayReconnect();
228         }
229
230         std::string GetDSN()
231         {
232                 std::ostringstream conninfo("connect_timeout = '5'");
233                 std::string item;
234
235                 if (conf->readString("host", item))
236                         conninfo << " host = '" << item << "'";
237
238                 if (conf->readString("port", item))
239                         conninfo << " port = '" << item << "'";
240
241                 if (conf->readString("name", item))
242                         conninfo << " dbname = '" << item << "'";
243
244                 if (conf->readString("user", item))
245                         conninfo << " user = '" << item << "'";
246
247                 if (conf->readString("pass", item))
248                         conninfo << " password = '" << item << "'";
249
250                 if (conf->getBool("ssl"))
251                         conninfo << " sslmode = 'require'";
252                 else
253                         conninfo << " sslmode = 'disable'";
254
255                 return conninfo.str();
256         }
257
258         bool DoConnect()
259         {
260                 sql = PQconnectStart(GetDSN().c_str());
261                 if (!sql)
262                         return false;
263
264                 if(PQstatus(sql) == CONNECTION_BAD)
265                         return false;
266
267                 if(PQsetnonblocking(sql, 1) == -1)
268                         return false;
269
270                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
271                 * and then start polling it.
272                 */
273                 this->fd = PQsocket(sql);
274
275                 if(this->fd <= -1)
276                         return false;
277
278                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
279                 {
280                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
281                         return false;
282                 }
283
284                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
285                 return DoPoll();
286         }
287
288         bool DoPoll()
289         {
290                 switch(PQconnectPoll(sql))
291                 {
292                         case PGRES_POLLING_WRITING:
293                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
294                                 status = CWRITE;
295                                 return true;
296                         case PGRES_POLLING_READING:
297                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
298                                 status = CREAD;
299                                 return true;
300                         case PGRES_POLLING_FAILED:
301                                 return false;
302                         case PGRES_POLLING_OK:
303                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
304                                 status = WWRITE;
305                                 DoConnectedPoll();
306                                 return true;
307                         default:
308                                 return true;
309                 }
310         }
311
312         void DoConnectedPoll()
313         {
314 restart:
315                 while (qinprog.q.empty() && !queue.empty())
316                 {
317                         /* There's no query currently in progress, and there's queries in the queue. */
318                         DoQuery(queue.front());
319                         queue.pop_front();
320                 }
321
322                 if (PQconsumeInput(sql))
323                 {
324                         if (PQisBusy(sql))
325                         {
326                                 /* Nothing happens here */
327                         }
328                         else if (qinprog.c)
329                         {
330                                 /* Fetch the result.. */
331                                 PGresult* result = PQgetResult(sql);
332
333                                 /* PgSQL would allow a query string to be sent which has multiple
334                                  * queries in it, this isn't portable across database backends and
335                                  * we don't want modules doing it. But just in case we make sure we
336                                  * drain any results there are and just use the last one.
337                                  * If the module devs are behaving there will only be one result.
338                                  */
339                                 while (PGresult* temp = PQgetResult(sql))
340                                 {
341                                         PQclear(result);
342                                         result = temp;
343                                 }
344
345                                 /* ..and the result */
346                                 PgSQLresult reply(result);
347                                 switch(PQresultStatus(result))
348                                 {
349                                         case PGRES_EMPTY_QUERY:
350                                         case PGRES_BAD_RESPONSE:
351                                         case PGRES_FATAL_ERROR:
352                                         {
353                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
354                                                 qinprog.c->OnError(err);
355                                                 break;
356                                         }
357                                         default:
358                                                 /* Other values are not errors */
359                                                 qinprog.c->OnResult(reply);
360                                 }
361
362                                 delete qinprog.c;
363                                 qinprog = QueueItem(NULL, "");
364                                 goto restart;
365                         }
366                         else
367                         {
368                                 qinprog.q.clear();
369                         }
370                 }
371                 else
372                 {
373                         /* I think we'll assume this means the server died...it might not,
374                          * but I think that any error serious enough we actually get here
375                          * deserves to reconnect [/excuse]
376                          * Returning true so the core doesn't try and close the connection.
377                          */
378                         DelayReconnect();
379                 }
380         }
381
382         bool DoResetPoll()
383         {
384                 switch(PQresetPoll(sql))
385                 {
386                         case PGRES_POLLING_WRITING:
387                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
388                                 status = CWRITE;
389                                 return DoPoll();
390                         case PGRES_POLLING_READING:
391                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
392                                 status = CREAD;
393                                 return true;
394                         case PGRES_POLLING_FAILED:
395                                 return false;
396                         case PGRES_POLLING_OK:
397                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
398                                 status = WWRITE;
399                                 DoConnectedPoll();
400                                 return true;
401                         default:
402                                 return true;
403                 }
404         }
405
406         void DelayReconnect();
407
408         void DoEvent()
409         {
410                 if((status == CREAD) || (status == CWRITE))
411                 {
412                         DoPoll();
413                 }
414                 else if((status == RREAD) || (status == RWRITE))
415                 {
416                         DoResetPoll();
417                 }
418                 else
419                 {
420                         DoConnectedPoll();
421                 }
422         }
423
424         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
425         {
426                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
427                 if (qinprog.q.empty())
428                 {
429                         DoQuery(QueueItem(req,q));
430                 }
431                 else
432                 {
433                         // wait your turn.
434                         queue.push_back(QueueItem(req,q));
435                 }
436         }
437
438         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
439         {
440                 std::string res;
441                 unsigned int param = 0;
442                 for(std::string::size_type i = 0; i < q.length(); i++)
443                 {
444                         if (q[i] != '?')
445                                 res.push_back(q[i]);
446                         else
447                         {
448                                 if (param < p.size())
449                                 {
450                                         std::string parm = p[param++];
451                                         std::vector<char> buffer(parm.length() * 2 + 1);
452                                         int error;
453                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
454                                         if (error)
455                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
456                                         res.append(&buffer[0], escapedsize);
457                                 }
458                         }
459                 }
460                 Submit(req, res);
461         }
462
463         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
464         {
465                 std::string res;
466                 for(std::string::size_type i = 0; i < q.length(); i++)
467                 {
468                         if (q[i] != '$')
469                                 res.push_back(q[i]);
470                         else
471                         {
472                                 std::string field;
473                                 i++;
474                                 while (i < q.length() && isalnum(q[i]))
475                                         field.push_back(q[i++]);
476                                 i--;
477
478                                 SQL::ParamMap::const_iterator it = p.find(field);
479                                 if (it != p.end())
480                                 {
481                                         std::string parm = it->second;
482                                         std::vector<char> buffer(parm.length() * 2 + 1);
483                                         int error;
484                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
485                                         if (error)
486                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
487                                         res.append(&buffer[0], escapedsize);
488                                 }
489                         }
490                 }
491                 Submit(req, res);
492         }
493
494         void DoQuery(const QueueItem& req)
495         {
496                 if (status != WREAD && status != WWRITE)
497                 {
498                         // whoops, not connected...
499                         SQL::Error err(SQL::BAD_CONN);
500                         req.c->OnError(err);
501                         delete req.c;
502                         return;
503                 }
504
505                 if(PQsendQuery(sql, req.q.c_str()))
506                 {
507                         qinprog = req;
508                 }
509                 else
510                 {
511                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
512                         req.c->OnError(err);
513                         delete req.c;
514                 }
515         }
516
517         void Close()
518         {
519                 SocketEngine::DelFd(this);
520
521                 if(sql)
522                 {
523                         PQfinish(sql);
524                         sql = NULL;
525                 }
526         }
527 };
528
529 class ModulePgSQL : public Module
530 {
531  public:
532         ConnMap connections;
533         ReconnectTimer* retimer;
534
535         ModulePgSQL()
536                 : retimer(NULL)
537         {
538         }
539
540         ~ModulePgSQL()
541         {
542                 delete retimer;
543                 ClearAllConnections();
544         }
545
546         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
547         {
548                 ReadConf();
549         }
550
551         void ReadConf()
552         {
553                 ConnMap conns;
554                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
555                 for(ConfigIter i = tags.first; i != tags.second; i++)
556                 {
557                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
558                                 continue;
559                         std::string id = i->second->getString("id");
560                         ConnMap::iterator curr = connections.find(id);
561                         if (curr == connections.end())
562                         {
563                                 SQLConn* conn = new SQLConn(this, i->second);
564                                 conns.insert(std::make_pair(id, conn));
565                                 ServerInstance->Modules->AddService(*conn);
566                         }
567                         else
568                         {
569                                 conns.insert(*curr);
570                                 connections.erase(curr);
571                         }
572                 }
573                 ClearAllConnections();
574                 conns.swap(connections);
575         }
576
577         void ClearAllConnections()
578         {
579                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
580                 {
581                         i->second->cull();
582                         delete i->second;
583                 }
584                 connections.clear();
585         }
586
587         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
588         {
589                 SQL::Error err(SQL::BAD_DBID);
590                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
591                 {
592                         SQLConn* conn = i->second;
593                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
594                         {
595                                 conn->qinprog.c->OnError(err);
596                                 delete conn->qinprog.c;
597                                 conn->qinprog.c = NULL;
598                         }
599                         std::deque<QueueItem>::iterator j = conn->queue.begin();
600                         while (j != conn->queue.end())
601                         {
602                                 SQL::Query* q = j->c;
603                                 if (q->creator == mod)
604                                 {
605                                         q->OnError(err);
606                                         delete q;
607                                         j = conn->queue.erase(j);
608                                 }
609                                 else
610                                         j++;
611                         }
612                 }
613         }
614
615         Version GetVersion() CXX11_OVERRIDE
616         {
617                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
618         }
619 };
620
621 bool ReconnectTimer::Tick(time_t time)
622 {
623         mod->retimer = NULL;
624         mod->ReadConf();
625         delete this;
626         return false;
627 }
628
629 void SQLConn::DelayReconnect()
630 {
631         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
632         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
633         if (it != mod->connections.end())
634         {
635                 mod->connections.erase(it);
636                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
637                 if (!mod->retimer)
638                 {
639                         mod->retimer = new ReconnectTimer(mod);
640                         ServerInstance->Timers.AddTimer(mod->retimer);
641                 }
642         }
643 }
644
645 MODULE_INIT(ModulePgSQL)