]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Remove current time parameter of the Timer constructor
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include <cstdlib>
27 #include <libpq-fe.h>
28 #include "modules/sql.h"
29
30 /* $CompileFlags: -Iexec("pg_config --includedir") eval("my $s = `pg_config --version`;$s =~ /^.*?(\d+)\.(\d+)\.(\d+).*?$/;my $v = hex(sprintf("0x%02x%02x%02x", $1, $2, $3));print "-DPGSQL_HAS_ESCAPECONN" if(($v >= 0x080104) || ($v >= 0x07030F && $v < 0x070400) || ($v >= 0x07040D && $v < 0x080000) || ($v >= 0x080008 && $v < 0x080100));") */
31 /* $LinkerFlags: -Lexec("pg_config --libdir") -lpq */
32
33 /* SQLConn rewritten by peavey to
34  * use EventHandler instead of
35  * BufferedSocket. This is much neater
36  * and gives total control of destroy
37  * and delete of resources.
38  */
39
40 /* Forward declare, so we can have the typedef neatly at the top */
41 class SQLConn;
42 class ModulePgSQL;
43
44 typedef std::map<std::string, SQLConn*> ConnMap;
45
46 /* CREAD,       Connecting and wants read event
47  * CWRITE,      Connecting and wants write event
48  * WREAD,       Connected/Working and wants read event
49  * WWRITE,      Connected/Working and wants write event
50  * RREAD,       Resetting and wants read event
51  * RWRITE,      Resetting and wants write event
52  */
53 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
54
55 class ReconnectTimer : public Timer
56 {
57  private:
58         ModulePgSQL* mod;
59  public:
60         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
61         {
62         }
63         bool Tick(time_t TIME);
64 };
65
66 struct QueueItem
67 {
68         SQLQuery* c;
69         std::string q;
70         QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
71 };
72
73 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
74  * All SQL providers must create their own subclass and define it's methods using that
75  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
76  * of converting all data to a common format before it reaches the result structure. This way
77  * data is passes to the module nearly as directly as if it was using the API directly itself.
78  */
79
80 class PgSQLresult : public SQLResult
81 {
82         PGresult* res;
83         int currentrow;
84         int rows;
85  public:
86         PgSQLresult(PGresult* result) : res(result), currentrow(0)
87         {
88                 rows = PQntuples(res);
89                 if (!rows)
90                         rows = atoi(PQcmdTuples(res));
91         }
92
93         ~PgSQLresult()
94         {
95                 PQclear(res);
96         }
97
98         int Rows()
99         {
100                 return rows;
101         }
102
103         void GetCols(std::vector<std::string>& result)
104         {
105                 result.resize(PQnfields(res));
106                 for(unsigned int i=0; i < result.size(); i++)
107                 {
108                         result[i] = PQfname(res, i);
109                 }
110         }
111
112         SQLEntry GetValue(int row, int column)
113         {
114                 char* v = PQgetvalue(res, row, column);
115                 if (!v || PQgetisnull(res, row, column))
116                         return SQLEntry();
117
118                 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
119         }
120
121         bool GetRow(SQLEntries& result)
122         {
123                 if (currentrow >= PQntuples(res))
124                         return false;
125                 int ncols = PQnfields(res);
126
127                 for(int i = 0; i < ncols; i++)
128                 {
129                         result.push_back(GetValue(currentrow, i));
130                 }
131                 currentrow++;
132
133                 return true;
134         }
135 };
136
137 /** SQLConn represents one SQL session.
138  */
139 class SQLConn : public SQLProvider, public EventHandler
140 {
141  public:
142         reference<ConfigTag> conf;      /* The <database> entry */
143         std::deque<QueueItem> queue;
144         PGconn*                 sql;            /* PgSQL database connection handle */
145         SQLstatus               status;         /* PgSQL database connection status */
146         QueueItem               qinprog;        /* If there is currently a query in progress */
147
148         SQLConn(Module* Creator, ConfigTag* tag)
149         : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
150         {
151                 if (!DoConnect())
152                 {
153                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
154                         DelayReconnect();
155                 }
156         }
157
158         CullResult cull()
159         {
160                 this->SQLProvider::cull();
161                 ServerInstance->Modules->DelService(*this);
162                 return this->EventHandler::cull();
163         }
164
165         ~SQLConn()
166         {
167                 SQLerror err(SQL_BAD_DBID);
168                 if (qinprog.c)
169                 {
170                         qinprog.c->OnError(err);
171                         delete qinprog.c;
172                 }
173                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
174                 {
175                         SQLQuery* q = i->c;
176                         q->OnError(err);
177                         delete q;
178                 }
179         }
180
181         void HandleEvent(EventType et, int errornum)
182         {
183                 switch (et)
184                 {
185                         case EVENT_READ:
186                         case EVENT_WRITE:
187                                 DoEvent();
188                         break;
189
190                         case EVENT_ERROR:
191                                 DelayReconnect();
192                 }
193         }
194
195         std::string GetDSN()
196         {
197                 std::ostringstream conninfo("connect_timeout = '5'");
198                 std::string item;
199
200                 if (conf->readString("host", item))
201                         conninfo << " host = '" << item << "'";
202
203                 if (conf->readString("port", item))
204                         conninfo << " port = '" << item << "'";
205
206                 if (conf->readString("name", item))
207                         conninfo << " dbname = '" << item << "'";
208
209                 if (conf->readString("user", item))
210                         conninfo << " user = '" << item << "'";
211
212                 if (conf->readString("pass", item))
213                         conninfo << " password = '" << item << "'";
214
215                 if (conf->getBool("ssl"))
216                         conninfo << " sslmode = 'require'";
217                 else
218                         conninfo << " sslmode = 'disable'";
219
220                 return conninfo.str();
221         }
222
223         bool DoConnect()
224         {
225                 sql = PQconnectStart(GetDSN().c_str());
226                 if (!sql)
227                         return false;
228
229                 if(PQstatus(sql) == CONNECTION_BAD)
230                         return false;
231
232                 if(PQsetnonblocking(sql, 1) == -1)
233                         return false;
234
235                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
236                 * and then start polling it.
237                 */
238                 this->fd = PQsocket(sql);
239
240                 if(this->fd <= -1)
241                         return false;
242
243                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
244                 {
245                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
246                         return false;
247                 }
248
249                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
250                 return DoPoll();
251         }
252
253         bool DoPoll()
254         {
255                 switch(PQconnectPoll(sql))
256                 {
257                         case PGRES_POLLING_WRITING:
258                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
259                                 status = CWRITE;
260                                 return true;
261                         case PGRES_POLLING_READING:
262                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
263                                 status = CREAD;
264                                 return true;
265                         case PGRES_POLLING_FAILED:
266                                 return false;
267                         case PGRES_POLLING_OK:
268                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
269                                 status = WWRITE;
270                                 DoConnectedPoll();
271                         default:
272                                 return true;
273                 }
274         }
275
276         void DoConnectedPoll()
277         {
278 restart:
279                 while (qinprog.q.empty() && !queue.empty())
280                 {
281                         /* There's no query currently in progress, and there's queries in the queue. */
282                         DoQuery(queue.front());
283                         queue.pop_front();
284                 }
285
286                 if (PQconsumeInput(sql))
287                 {
288                         if (PQisBusy(sql))
289                         {
290                                 /* Nothing happens here */
291                         }
292                         else if (qinprog.c)
293                         {
294                                 /* Fetch the result.. */
295                                 PGresult* result = PQgetResult(sql);
296
297                                 /* PgSQL would allow a query string to be sent which has multiple
298                                  * queries in it, this isn't portable across database backends and
299                                  * we don't want modules doing it. But just in case we make sure we
300                                  * drain any results there are and just use the last one.
301                                  * If the module devs are behaving there will only be one result.
302                                  */
303                                 while (PGresult* temp = PQgetResult(sql))
304                                 {
305                                         PQclear(result);
306                                         result = temp;
307                                 }
308
309                                 /* ..and the result */
310                                 PgSQLresult reply(result);
311                                 switch(PQresultStatus(result))
312                                 {
313                                         case PGRES_EMPTY_QUERY:
314                                         case PGRES_BAD_RESPONSE:
315                                         case PGRES_FATAL_ERROR:
316                                         {
317                                                 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
318                                                 qinprog.c->OnError(err);
319                                                 break;
320                                         }
321                                         default:
322                                                 /* Other values are not errors */
323                                                 qinprog.c->OnResult(reply);
324                                 }
325
326                                 delete qinprog.c;
327                                 qinprog = QueueItem(NULL, "");
328                                 goto restart;
329                         }
330                         else
331                         {
332                                 qinprog.q.clear();
333                         }
334                 }
335                 else
336                 {
337                         /* I think we'll assume this means the server died...it might not,
338                          * but I think that any error serious enough we actually get here
339                          * deserves to reconnect [/excuse]
340                          * Returning true so the core doesn't try and close the connection.
341                          */
342                         DelayReconnect();
343                 }
344         }
345
346         bool DoResetPoll()
347         {
348                 switch(PQresetPoll(sql))
349                 {
350                         case PGRES_POLLING_WRITING:
351                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
352                                 status = CWRITE;
353                                 return DoPoll();
354                         case PGRES_POLLING_READING:
355                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
356                                 status = CREAD;
357                                 return true;
358                         case PGRES_POLLING_FAILED:
359                                 return false;
360                         case PGRES_POLLING_OK:
361                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
362                                 status = WWRITE;
363                                 DoConnectedPoll();
364                         default:
365                                 return true;
366                 }
367         }
368
369         void DelayReconnect();
370
371         void DoEvent()
372         {
373                 if((status == CREAD) || (status == CWRITE))
374                 {
375                         DoPoll();
376                 }
377                 else if((status == RREAD) || (status == RWRITE))
378                 {
379                         DoResetPoll();
380                 }
381                 else
382                 {
383                         DoConnectedPoll();
384                 }
385         }
386
387         void submit(SQLQuery *req, const std::string& q)
388         {
389                 if (qinprog.q.empty())
390                 {
391                         DoQuery(QueueItem(req,q));
392                 }
393                 else
394                 {
395                         // wait your turn.
396                         queue.push_back(QueueItem(req,q));
397                 }
398         }
399
400         void submit(SQLQuery *req, const std::string& q, const ParamL& p)
401         {
402                 std::string res;
403                 unsigned int param = 0;
404                 for(std::string::size_type i = 0; i < q.length(); i++)
405                 {
406                         if (q[i] != '?')
407                                 res.push_back(q[i]);
408                         else
409                         {
410                                 if (param < p.size())
411                                 {
412                                         std::string parm = p[param++];
413                                         std::vector<char> buffer(parm.length() * 2 + 1);
414 #ifdef PGSQL_HAS_ESCAPECONN
415                                         int error;
416                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
417                                         if (error)
418                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
419 #else
420                                         size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
421 #endif
422                                         res.append(&buffer[0], escapedsize);
423                                 }
424                         }
425                 }
426                 submit(req, res);
427         }
428
429         void submit(SQLQuery *req, const std::string& q, const ParamM& p)
430         {
431                 std::string res;
432                 for(std::string::size_type i = 0; i < q.length(); i++)
433                 {
434                         if (q[i] != '$')
435                                 res.push_back(q[i]);
436                         else
437                         {
438                                 std::string field;
439                                 i++;
440                                 while (i < q.length() && isalnum(q[i]))
441                                         field.push_back(q[i++]);
442                                 i--;
443
444                                 ParamM::const_iterator it = p.find(field);
445                                 if (it != p.end())
446                                 {
447                                         std::string parm = it->second;
448                                         std::vector<char> buffer(parm.length() * 2 + 1);
449 #ifdef PGSQL_HAS_ESCAPECONN
450                                         int error;
451                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
452                                         if (error)
453                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
454 #else
455                                         size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
456 #endif
457                                         res.append(&buffer[0], escapedsize);
458                                 }
459                         }
460                 }
461                 submit(req, res);
462         }
463
464         void DoQuery(const QueueItem& req)
465         {
466                 if (status != WREAD && status != WWRITE)
467                 {
468                         // whoops, not connected...
469                         SQLerror err(SQL_BAD_CONN);
470                         req.c->OnError(err);
471                         delete req.c;
472                         return;
473                 }
474
475                 if(PQsendQuery(sql, req.q.c_str()))
476                 {
477                         qinprog = req;
478                 }
479                 else
480                 {
481                         SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
482                         req.c->OnError(err);
483                         delete req.c;
484                 }
485         }
486
487         void Close()
488         {
489                 SocketEngine::DelFd(this);
490
491                 if(sql)
492                 {
493                         PQfinish(sql);
494                         sql = NULL;
495                 }
496         }
497 };
498
499 class ModulePgSQL : public Module
500 {
501  public:
502         ConnMap connections;
503         ReconnectTimer* retimer;
504
505         ModulePgSQL()
506                 : retimer(NULL)
507         {
508         }
509
510         ~ModulePgSQL()
511         {
512                 delete retimer;
513                 ClearAllConnections();
514         }
515
516         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
517         {
518                 ReadConf();
519         }
520
521         void ReadConf()
522         {
523                 ConnMap conns;
524                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
525                 for(ConfigIter i = tags.first; i != tags.second; i++)
526                 {
527                         if (i->second->getString("module", "pgsql") != "pgsql")
528                                 continue;
529                         std::string id = i->second->getString("id");
530                         ConnMap::iterator curr = connections.find(id);
531                         if (curr == connections.end())
532                         {
533                                 SQLConn* conn = new SQLConn(this, i->second);
534                                 conns.insert(std::make_pair(id, conn));
535                                 ServerInstance->Modules->AddService(*conn);
536                         }
537                         else
538                         {
539                                 conns.insert(*curr);
540                                 connections.erase(curr);
541                         }
542                 }
543                 ClearAllConnections();
544                 conns.swap(connections);
545         }
546
547         void ClearAllConnections()
548         {
549                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
550                 {
551                         i->second->cull();
552                         delete i->second;
553                 }
554                 connections.clear();
555         }
556
557         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
558         {
559                 SQLerror err(SQL_BAD_DBID);
560                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
561                 {
562                         SQLConn* conn = i->second;
563                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
564                         {
565                                 conn->qinprog.c->OnError(err);
566                                 delete conn->qinprog.c;
567                                 conn->qinprog.c = NULL;
568                         }
569                         std::deque<QueueItem>::iterator j = conn->queue.begin();
570                         while (j != conn->queue.end())
571                         {
572                                 SQLQuery* q = j->c;
573                                 if (q->creator == mod)
574                                 {
575                                         q->OnError(err);
576                                         delete q;
577                                         j = conn->queue.erase(j);
578                                 }
579                                 else
580                                         j++;
581                         }
582                 }
583         }
584
585         Version GetVersion() CXX11_OVERRIDE
586         {
587                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
588         }
589 };
590
591 bool ReconnectTimer::Tick(time_t time)
592 {
593         mod->retimer = NULL;
594         mod->ReadConf();
595         delete this;
596         return false;
597 }
598
599 void SQLConn::DelayReconnect()
600 {
601         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
602         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
603         if (it != mod->connections.end())
604         {
605                 mod->connections.erase(it);
606                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
607                 if (!mod->retimer)
608                 {
609                         mod->retimer = new ReconnectTimer(mod);
610                         ServerInstance->Timers.AddTimer(mod->retimer);
611                 }
612         }
613 }
614
615 MODULE_INIT(ModulePgSQL)