]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Add PackageInfo directives for Debian.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
26
27 /// $PackageInfo: require_system("centos") postgresql-devel
28 /// $PackageInfo: require_system("darwin") postgresql
29 /// $PackageInfo: require_system("debian") libpq-dev
30 /// $PackageInfo: require_system("ubuntu") libpq-dev
31
32
33 #include "inspircd.h"
34 #include <cstdlib>
35 #include <libpq-fe.h>
36 #include "modules/sql.h"
37
38 /* SQLConn rewritten by peavey to
39  * use EventHandler instead of
40  * BufferedSocket. This is much neater
41  * and gives total control of destroy
42  * and delete of resources.
43  */
44
45 /* Forward declare, so we can have the typedef neatly at the top */
46 class SQLConn;
47 class ModulePgSQL;
48
49 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
50
51 /* CREAD,       Connecting and wants read event
52  * CWRITE,      Connecting and wants write event
53  * WREAD,       Connected/Working and wants read event
54  * WWRITE,      Connected/Working and wants write event
55  * RREAD,       Resetting and wants read event
56  * RWRITE,      Resetting and wants write event
57  */
58 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
59
60 class ReconnectTimer : public Timer
61 {
62  private:
63         ModulePgSQL* mod;
64  public:
65         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
66         {
67         }
68         bool Tick(time_t TIME);
69 };
70
71 struct QueueItem
72 {
73         SQLQuery* c;
74         std::string q;
75         QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
76 };
77
78 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
79  * All SQL providers must create their own subclass and define it's methods using that
80  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
81  * of converting all data to a common format before it reaches the result structure. This way
82  * data is passes to the module nearly as directly as if it was using the API directly itself.
83  */
84
85 class PgSQLresult : public SQLResult
86 {
87         PGresult* res;
88         int currentrow;
89         int rows;
90  public:
91         PgSQLresult(PGresult* result) : res(result), currentrow(0)
92         {
93                 rows = PQntuples(res);
94                 if (!rows)
95                         rows = atoi(PQcmdTuples(res));
96         }
97
98         ~PgSQLresult()
99         {
100                 PQclear(res);
101         }
102
103         int Rows()
104         {
105                 return rows;
106         }
107
108         void GetCols(std::vector<std::string>& result)
109         {
110                 result.resize(PQnfields(res));
111                 for(unsigned int i=0; i < result.size(); i++)
112                 {
113                         result[i] = PQfname(res, i);
114                 }
115         }
116
117         SQLEntry GetValue(int row, int column)
118         {
119                 char* v = PQgetvalue(res, row, column);
120                 if (!v || PQgetisnull(res, row, column))
121                         return SQLEntry();
122
123                 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
124         }
125
126         bool GetRow(SQLEntries& result)
127         {
128                 if (currentrow >= PQntuples(res))
129                         return false;
130                 int ncols = PQnfields(res);
131
132                 for(int i = 0; i < ncols; i++)
133                 {
134                         result.push_back(GetValue(currentrow, i));
135                 }
136                 currentrow++;
137
138                 return true;
139         }
140 };
141
142 /** SQLConn represents one SQL session.
143  */
144 class SQLConn : public SQLProvider, public EventHandler
145 {
146  public:
147         reference<ConfigTag> conf;      /* The <database> entry */
148         std::deque<QueueItem> queue;
149         PGconn*                 sql;            /* PgSQL database connection handle */
150         SQLstatus               status;         /* PgSQL database connection status */
151         QueueItem               qinprog;        /* If there is currently a query in progress */
152
153         SQLConn(Module* Creator, ConfigTag* tag)
154         : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
155         {
156                 if (!DoConnect())
157                 {
158                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
159                         DelayReconnect();
160                 }
161         }
162
163         CullResult cull() CXX11_OVERRIDE
164         {
165                 this->SQLProvider::cull();
166                 ServerInstance->Modules->DelService(*this);
167                 return this->EventHandler::cull();
168         }
169
170         ~SQLConn()
171         {
172                 SQLerror err(SQL_BAD_DBID);
173                 if (qinprog.c)
174                 {
175                         qinprog.c->OnError(err);
176                         delete qinprog.c;
177                 }
178                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
179                 {
180                         SQLQuery* q = i->c;
181                         q->OnError(err);
182                         delete q;
183                 }
184         }
185
186         void OnEventHandlerRead() CXX11_OVERRIDE
187         {
188                 DoEvent();
189         }
190
191         void OnEventHandlerWrite() CXX11_OVERRIDE
192         {
193                 DoEvent();
194         }
195
196         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
197         {
198                 DelayReconnect();
199         }
200
201         std::string GetDSN()
202         {
203                 std::ostringstream conninfo("connect_timeout = '5'");
204                 std::string item;
205
206                 if (conf->readString("host", item))
207                         conninfo << " host = '" << item << "'";
208
209                 if (conf->readString("port", item))
210                         conninfo << " port = '" << item << "'";
211
212                 if (conf->readString("name", item))
213                         conninfo << " dbname = '" << item << "'";
214
215                 if (conf->readString("user", item))
216                         conninfo << " user = '" << item << "'";
217
218                 if (conf->readString("pass", item))
219                         conninfo << " password = '" << item << "'";
220
221                 if (conf->getBool("ssl"))
222                         conninfo << " sslmode = 'require'";
223                 else
224                         conninfo << " sslmode = 'disable'";
225
226                 return conninfo.str();
227         }
228
229         bool DoConnect()
230         {
231                 sql = PQconnectStart(GetDSN().c_str());
232                 if (!sql)
233                         return false;
234
235                 if(PQstatus(sql) == CONNECTION_BAD)
236                         return false;
237
238                 if(PQsetnonblocking(sql, 1) == -1)
239                         return false;
240
241                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
242                 * and then start polling it.
243                 */
244                 this->fd = PQsocket(sql);
245
246                 if(this->fd <= -1)
247                         return false;
248
249                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
250                 {
251                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
252                         return false;
253                 }
254
255                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
256                 return DoPoll();
257         }
258
259         bool DoPoll()
260         {
261                 switch(PQconnectPoll(sql))
262                 {
263                         case PGRES_POLLING_WRITING:
264                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
265                                 status = CWRITE;
266                                 return true;
267                         case PGRES_POLLING_READING:
268                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
269                                 status = CREAD;
270                                 return true;
271                         case PGRES_POLLING_FAILED:
272                                 return false;
273                         case PGRES_POLLING_OK:
274                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
275                                 status = WWRITE;
276                                 DoConnectedPoll();
277                         default:
278                                 return true;
279                 }
280         }
281
282         void DoConnectedPoll()
283         {
284 restart:
285                 while (qinprog.q.empty() && !queue.empty())
286                 {
287                         /* There's no query currently in progress, and there's queries in the queue. */
288                         DoQuery(queue.front());
289                         queue.pop_front();
290                 }
291
292                 if (PQconsumeInput(sql))
293                 {
294                         if (PQisBusy(sql))
295                         {
296                                 /* Nothing happens here */
297                         }
298                         else if (qinprog.c)
299                         {
300                                 /* Fetch the result.. */
301                                 PGresult* result = PQgetResult(sql);
302
303                                 /* PgSQL would allow a query string to be sent which has multiple
304                                  * queries in it, this isn't portable across database backends and
305                                  * we don't want modules doing it. But just in case we make sure we
306                                  * drain any results there are and just use the last one.
307                                  * If the module devs are behaving there will only be one result.
308                                  */
309                                 while (PGresult* temp = PQgetResult(sql))
310                                 {
311                                         PQclear(result);
312                                         result = temp;
313                                 }
314
315                                 /* ..and the result */
316                                 PgSQLresult reply(result);
317                                 switch(PQresultStatus(result))
318                                 {
319                                         case PGRES_EMPTY_QUERY:
320                                         case PGRES_BAD_RESPONSE:
321                                         case PGRES_FATAL_ERROR:
322                                         {
323                                                 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
324                                                 qinprog.c->OnError(err);
325                                                 break;
326                                         }
327                                         default:
328                                                 /* Other values are not errors */
329                                                 qinprog.c->OnResult(reply);
330                                 }
331
332                                 delete qinprog.c;
333                                 qinprog = QueueItem(NULL, "");
334                                 goto restart;
335                         }
336                         else
337                         {
338                                 qinprog.q.clear();
339                         }
340                 }
341                 else
342                 {
343                         /* I think we'll assume this means the server died...it might not,
344                          * but I think that any error serious enough we actually get here
345                          * deserves to reconnect [/excuse]
346                          * Returning true so the core doesn't try and close the connection.
347                          */
348                         DelayReconnect();
349                 }
350         }
351
352         bool DoResetPoll()
353         {
354                 switch(PQresetPoll(sql))
355                 {
356                         case PGRES_POLLING_WRITING:
357                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
358                                 status = CWRITE;
359                                 return DoPoll();
360                         case PGRES_POLLING_READING:
361                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
362                                 status = CREAD;
363                                 return true;
364                         case PGRES_POLLING_FAILED:
365                                 return false;
366                         case PGRES_POLLING_OK:
367                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
368                                 status = WWRITE;
369                                 DoConnectedPoll();
370                         default:
371                                 return true;
372                 }
373         }
374
375         void DelayReconnect();
376
377         void DoEvent()
378         {
379                 if((status == CREAD) || (status == CWRITE))
380                 {
381                         DoPoll();
382                 }
383                 else if((status == RREAD) || (status == RWRITE))
384                 {
385                         DoResetPoll();
386                 }
387                 else
388                 {
389                         DoConnectedPoll();
390                 }
391         }
392
393         void submit(SQLQuery *req, const std::string& q) CXX11_OVERRIDE
394         {
395                 if (qinprog.q.empty())
396                 {
397                         DoQuery(QueueItem(req,q));
398                 }
399                 else
400                 {
401                         // wait your turn.
402                         queue.push_back(QueueItem(req,q));
403                 }
404         }
405
406         void submit(SQLQuery *req, const std::string& q, const ParamL& p) CXX11_OVERRIDE
407         {
408                 std::string res;
409                 unsigned int param = 0;
410                 for(std::string::size_type i = 0; i < q.length(); i++)
411                 {
412                         if (q[i] != '?')
413                                 res.push_back(q[i]);
414                         else
415                         {
416                                 if (param < p.size())
417                                 {
418                                         std::string parm = p[param++];
419                                         std::vector<char> buffer(parm.length() * 2 + 1);
420                                         int error;
421                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
422                                         if (error)
423                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
424                                         res.append(&buffer[0], escapedsize);
425                                 }
426                         }
427                 }
428                 submit(req, res);
429         }
430
431         void submit(SQLQuery *req, const std::string& q, const ParamM& p) CXX11_OVERRIDE
432         {
433                 std::string res;
434                 for(std::string::size_type i = 0; i < q.length(); i++)
435                 {
436                         if (q[i] != '$')
437                                 res.push_back(q[i]);
438                         else
439                         {
440                                 std::string field;
441                                 i++;
442                                 while (i < q.length() && isalnum(q[i]))
443                                         field.push_back(q[i++]);
444                                 i--;
445
446                                 ParamM::const_iterator it = p.find(field);
447                                 if (it != p.end())
448                                 {
449                                         std::string parm = it->second;
450                                         std::vector<char> buffer(parm.length() * 2 + 1);
451                                         int error;
452                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
453                                         if (error)
454                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
455                                         res.append(&buffer[0], escapedsize);
456                                 }
457                         }
458                 }
459                 submit(req, res);
460         }
461
462         void DoQuery(const QueueItem& req)
463         {
464                 if (status != WREAD && status != WWRITE)
465                 {
466                         // whoops, not connected...
467                         SQLerror err(SQL_BAD_CONN);
468                         req.c->OnError(err);
469                         delete req.c;
470                         return;
471                 }
472
473                 if(PQsendQuery(sql, req.q.c_str()))
474                 {
475                         qinprog = req;
476                 }
477                 else
478                 {
479                         SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
480                         req.c->OnError(err);
481                         delete req.c;
482                 }
483         }
484
485         void Close()
486         {
487                 SocketEngine::DelFd(this);
488
489                 if(sql)
490                 {
491                         PQfinish(sql);
492                         sql = NULL;
493                 }
494         }
495 };
496
497 class ModulePgSQL : public Module
498 {
499  public:
500         ConnMap connections;
501         ReconnectTimer* retimer;
502
503         ModulePgSQL()
504                 : retimer(NULL)
505         {
506         }
507
508         ~ModulePgSQL()
509         {
510                 delete retimer;
511                 ClearAllConnections();
512         }
513
514         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
515         {
516                 ReadConf();
517         }
518
519         void ReadConf()
520         {
521                 ConnMap conns;
522                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
523                 for(ConfigIter i = tags.first; i != tags.second; i++)
524                 {
525                         if (i->second->getString("module", "pgsql") != "pgsql")
526                                 continue;
527                         std::string id = i->second->getString("id");
528                         ConnMap::iterator curr = connections.find(id);
529                         if (curr == connections.end())
530                         {
531                                 SQLConn* conn = new SQLConn(this, i->second);
532                                 conns.insert(std::make_pair(id, conn));
533                                 ServerInstance->Modules->AddService(*conn);
534                         }
535                         else
536                         {
537                                 conns.insert(*curr);
538                                 connections.erase(curr);
539                         }
540                 }
541                 ClearAllConnections();
542                 conns.swap(connections);
543         }
544
545         void ClearAllConnections()
546         {
547                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
548                 {
549                         i->second->cull();
550                         delete i->second;
551                 }
552                 connections.clear();
553         }
554
555         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
556         {
557                 SQLerror err(SQL_BAD_DBID);
558                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
559                 {
560                         SQLConn* conn = i->second;
561                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
562                         {
563                                 conn->qinprog.c->OnError(err);
564                                 delete conn->qinprog.c;
565                                 conn->qinprog.c = NULL;
566                         }
567                         std::deque<QueueItem>::iterator j = conn->queue.begin();
568                         while (j != conn->queue.end())
569                         {
570                                 SQLQuery* q = j->c;
571                                 if (q->creator == mod)
572                                 {
573                                         q->OnError(err);
574                                         delete q;
575                                         j = conn->queue.erase(j);
576                                 }
577                                 else
578                                         j++;
579                         }
580                 }
581         }
582
583         Version GetVersion() CXX11_OVERRIDE
584         {
585                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
586         }
587 };
588
589 bool ReconnectTimer::Tick(time_t time)
590 {
591         mod->retimer = NULL;
592         mod->ReadConf();
593         delete this;
594         return false;
595 }
596
597 void SQLConn::DelayReconnect()
598 {
599         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
600         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
601         if (it != mod->connections.end())
602         {
603                 mod->connections.erase(it);
604                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
605                 if (!mod->retimer)
606                 {
607                         mod->retimer = new ReconnectTimer(mod);
608                         ServerInstance->Timers.AddTimer(mod->retimer);
609                 }
610         }
611 }
612
613 MODULE_INIT(ModulePgSQL)