]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
sslinfo: use the SSL certificate API to get user SSL certificates.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
26
27 /// $PackageInfo: require_system("centos") postgresql-devel
28 /// $PackageInfo: require_system("darwin") postgresql
29 /// $PackageInfo: require_system("debian") libpq-dev
30 /// $PackageInfo: require_system("ubuntu") libpq-dev
31
32
33 #include "inspircd.h"
34 #include <cstdlib>
35 #include <libpq-fe.h>
36 #include "modules/sql.h"
37
38 /* SQLConn rewritten by peavey to
39  * use EventHandler instead of
40  * BufferedSocket. This is much neater
41  * and gives total control of destroy
42  * and delete of resources.
43  */
44
45 /* Forward declare, so we can have the typedef neatly at the top */
46 class SQLConn;
47 class ModulePgSQL;
48
49 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
50
51 /* CREAD,       Connecting and wants read event
52  * CWRITE,      Connecting and wants write event
53  * WREAD,       Connected/Working and wants read event
54  * WWRITE,      Connected/Working and wants write event
55  * RREAD,       Resetting and wants read event
56  * RWRITE,      Resetting and wants write event
57  */
58 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
59
60 class ReconnectTimer : public Timer
61 {
62  private:
63         ModulePgSQL* mod;
64  public:
65         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
66         {
67         }
68         bool Tick(time_t TIME) CXX11_OVERRIDE;
69 };
70
71 struct QueueItem
72 {
73         SQL::Query* c;
74         std::string q;
75         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
76 };
77
78 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
79  * All SQL providers must create their own subclass and define it's methods using that
80  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
81  * of converting all data to a common format before it reaches the result structure. This way
82  * data is passes to the module nearly as directly as if it was using the API directly itself.
83  */
84
85 class PgSQLresult : public SQL::Result
86 {
87         PGresult* res;
88         int currentrow;
89         int rows;
90         std::vector<std::string> colnames;
91
92         void getColNames()
93         {
94                 colnames.resize(PQnfields(res));
95                 for(unsigned int i=0; i < colnames.size(); i++)
96                 {
97                         colnames[i] = PQfname(res, i);
98                 }
99         }
100  public:
101         PgSQLresult(PGresult* result) : res(result), currentrow(0)
102         {
103                 rows = PQntuples(res);
104                 if (!rows)
105                         rows = atoi(PQcmdTuples(res));
106         }
107
108         ~PgSQLresult()
109         {
110                 PQclear(res);
111         }
112
113         int Rows() CXX11_OVERRIDE
114         {
115                 return rows;
116         }
117
118         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
119         {
120                 if (colnames.empty())
121                         getColNames();
122                 result = colnames;
123         }
124
125         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
126         {
127                 if (colnames.empty())
128                         getColNames();
129
130                 for (size_t i = 0; i < colnames.size(); ++i)
131                 {
132                         if (colnames[i] == column)
133                         {
134                                 index = i;
135                                 return true;
136                         }
137                 }
138                 return false;
139         }
140
141         SQL::Field GetValue(int row, int column)
142         {
143                 char* v = PQgetvalue(res, row, column);
144                 if (!v || PQgetisnull(res, row, column))
145                         return SQL::Field();
146
147                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
148         }
149
150         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
151         {
152                 if (currentrow >= PQntuples(res))
153                         return false;
154                 int ncols = PQnfields(res);
155
156                 for(int i = 0; i < ncols; i++)
157                 {
158                         result.push_back(GetValue(currentrow, i));
159                 }
160                 currentrow++;
161
162                 return true;
163         }
164 };
165
166 /** SQLConn represents one SQL session.
167  */
168 class SQLConn : public SQL::Provider, public EventHandler
169 {
170  public:
171         reference<ConfigTag> conf;      /* The <database> entry */
172         std::deque<QueueItem> queue;
173         PGconn*                 sql;            /* PgSQL database connection handle */
174         SQLstatus               status;         /* PgSQL database connection status */
175         QueueItem               qinprog;        /* If there is currently a query in progress */
176
177         SQLConn(Module* Creator, ConfigTag* tag)
178         : SQL::Provider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
179         {
180                 if (!DoConnect())
181                 {
182                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
183                         DelayReconnect();
184                 }
185         }
186
187         CullResult cull() CXX11_OVERRIDE
188         {
189                 this->SQL::Provider::cull();
190                 ServerInstance->Modules->DelService(*this);
191                 return this->EventHandler::cull();
192         }
193
194         ~SQLConn()
195         {
196                 SQL::Error err(SQL::BAD_DBID);
197                 if (qinprog.c)
198                 {
199                         qinprog.c->OnError(err);
200                         delete qinprog.c;
201                 }
202                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
203                 {
204                         SQL::Query* q = i->c;
205                         q->OnError(err);
206                         delete q;
207                 }
208         }
209
210         void OnEventHandlerRead() CXX11_OVERRIDE
211         {
212                 DoEvent();
213         }
214
215         void OnEventHandlerWrite() CXX11_OVERRIDE
216         {
217                 DoEvent();
218         }
219
220         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
221         {
222                 DelayReconnect();
223         }
224
225         std::string GetDSN()
226         {
227                 std::ostringstream conninfo("connect_timeout = '5'");
228                 std::string item;
229
230                 if (conf->readString("host", item))
231                         conninfo << " host = '" << item << "'";
232
233                 if (conf->readString("port", item))
234                         conninfo << " port = '" << item << "'";
235
236                 if (conf->readString("name", item))
237                         conninfo << " dbname = '" << item << "'";
238
239                 if (conf->readString("user", item))
240                         conninfo << " user = '" << item << "'";
241
242                 if (conf->readString("pass", item))
243                         conninfo << " password = '" << item << "'";
244
245                 if (conf->getBool("ssl"))
246                         conninfo << " sslmode = 'require'";
247                 else
248                         conninfo << " sslmode = 'disable'";
249
250                 return conninfo.str();
251         }
252
253         bool DoConnect()
254         {
255                 sql = PQconnectStart(GetDSN().c_str());
256                 if (!sql)
257                         return false;
258
259                 if(PQstatus(sql) == CONNECTION_BAD)
260                         return false;
261
262                 if(PQsetnonblocking(sql, 1) == -1)
263                         return false;
264
265                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
266                 * and then start polling it.
267                 */
268                 this->fd = PQsocket(sql);
269
270                 if(this->fd <= -1)
271                         return false;
272
273                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
274                 {
275                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
276                         return false;
277                 }
278
279                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
280                 return DoPoll();
281         }
282
283         bool DoPoll()
284         {
285                 switch(PQconnectPoll(sql))
286                 {
287                         case PGRES_POLLING_WRITING:
288                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
289                                 status = CWRITE;
290                                 return true;
291                         case PGRES_POLLING_READING:
292                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
293                                 status = CREAD;
294                                 return true;
295                         case PGRES_POLLING_FAILED:
296                                 return false;
297                         case PGRES_POLLING_OK:
298                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
299                                 status = WWRITE;
300                                 DoConnectedPoll();
301                                 return true;
302                         default:
303                                 return true;
304                 }
305         }
306
307         void DoConnectedPoll()
308         {
309 restart:
310                 while (qinprog.q.empty() && !queue.empty())
311                 {
312                         /* There's no query currently in progress, and there's queries in the queue. */
313                         DoQuery(queue.front());
314                         queue.pop_front();
315                 }
316
317                 if (PQconsumeInput(sql))
318                 {
319                         if (PQisBusy(sql))
320                         {
321                                 /* Nothing happens here */
322                         }
323                         else if (qinprog.c)
324                         {
325                                 /* Fetch the result.. */
326                                 PGresult* result = PQgetResult(sql);
327
328                                 /* PgSQL would allow a query string to be sent which has multiple
329                                  * queries in it, this isn't portable across database backends and
330                                  * we don't want modules doing it. But just in case we make sure we
331                                  * drain any results there are and just use the last one.
332                                  * If the module devs are behaving there will only be one result.
333                                  */
334                                 while (PGresult* temp = PQgetResult(sql))
335                                 {
336                                         PQclear(result);
337                                         result = temp;
338                                 }
339
340                                 /* ..and the result */
341                                 PgSQLresult reply(result);
342                                 switch(PQresultStatus(result))
343                                 {
344                                         case PGRES_EMPTY_QUERY:
345                                         case PGRES_BAD_RESPONSE:
346                                         case PGRES_FATAL_ERROR:
347                                         {
348                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
349                                                 qinprog.c->OnError(err);
350                                                 break;
351                                         }
352                                         default:
353                                                 /* Other values are not errors */
354                                                 qinprog.c->OnResult(reply);
355                                 }
356
357                                 delete qinprog.c;
358                                 qinprog = QueueItem(NULL, "");
359                                 goto restart;
360                         }
361                         else
362                         {
363                                 qinprog.q.clear();
364                         }
365                 }
366                 else
367                 {
368                         /* I think we'll assume this means the server died...it might not,
369                          * but I think that any error serious enough we actually get here
370                          * deserves to reconnect [/excuse]
371                          * Returning true so the core doesn't try and close the connection.
372                          */
373                         DelayReconnect();
374                 }
375         }
376
377         bool DoResetPoll()
378         {
379                 switch(PQresetPoll(sql))
380                 {
381                         case PGRES_POLLING_WRITING:
382                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
383                                 status = CWRITE;
384                                 return DoPoll();
385                         case PGRES_POLLING_READING:
386                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
387                                 status = CREAD;
388                                 return true;
389                         case PGRES_POLLING_FAILED:
390                                 return false;
391                         case PGRES_POLLING_OK:
392                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
393                                 status = WWRITE;
394                                 DoConnectedPoll();
395                                 return true;
396                         default:
397                                 return true;
398                 }
399         }
400
401         void DelayReconnect();
402
403         void DoEvent()
404         {
405                 if((status == CREAD) || (status == CWRITE))
406                 {
407                         DoPoll();
408                 }
409                 else if((status == RREAD) || (status == RWRITE))
410                 {
411                         DoResetPoll();
412                 }
413                 else
414                 {
415                         DoConnectedPoll();
416                 }
417         }
418
419         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
420         {
421                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
422                 if (qinprog.q.empty())
423                 {
424                         DoQuery(QueueItem(req,q));
425                 }
426                 else
427                 {
428                         // wait your turn.
429                         queue.push_back(QueueItem(req,q));
430                 }
431         }
432
433         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
434         {
435                 std::string res;
436                 unsigned int param = 0;
437                 for(std::string::size_type i = 0; i < q.length(); i++)
438                 {
439                         if (q[i] != '?')
440                                 res.push_back(q[i]);
441                         else
442                         {
443                                 if (param < p.size())
444                                 {
445                                         std::string parm = p[param++];
446                                         std::vector<char> buffer(parm.length() * 2 + 1);
447                                         int error;
448                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
449                                         if (error)
450                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
451                                         res.append(&buffer[0], escapedsize);
452                                 }
453                         }
454                 }
455                 Submit(req, res);
456         }
457
458         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
459         {
460                 std::string res;
461                 for(std::string::size_type i = 0; i < q.length(); i++)
462                 {
463                         if (q[i] != '$')
464                                 res.push_back(q[i]);
465                         else
466                         {
467                                 std::string field;
468                                 i++;
469                                 while (i < q.length() && isalnum(q[i]))
470                                         field.push_back(q[i++]);
471                                 i--;
472
473                                 SQL::ParamMap::const_iterator it = p.find(field);
474                                 if (it != p.end())
475                                 {
476                                         std::string parm = it->second;
477                                         std::vector<char> buffer(parm.length() * 2 + 1);
478                                         int error;
479                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
480                                         if (error)
481                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
482                                         res.append(&buffer[0], escapedsize);
483                                 }
484                         }
485                 }
486                 Submit(req, res);
487         }
488
489         void DoQuery(const QueueItem& req)
490         {
491                 if (status != WREAD && status != WWRITE)
492                 {
493                         // whoops, not connected...
494                         SQL::Error err(SQL::BAD_CONN);
495                         req.c->OnError(err);
496                         delete req.c;
497                         return;
498                 }
499
500                 if(PQsendQuery(sql, req.q.c_str()))
501                 {
502                         qinprog = req;
503                 }
504                 else
505                 {
506                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
507                         req.c->OnError(err);
508                         delete req.c;
509                 }
510         }
511
512         void Close()
513         {
514                 SocketEngine::DelFd(this);
515
516                 if(sql)
517                 {
518                         PQfinish(sql);
519                         sql = NULL;
520                 }
521         }
522 };
523
524 class ModulePgSQL : public Module
525 {
526  public:
527         ConnMap connections;
528         ReconnectTimer* retimer;
529
530         ModulePgSQL()
531                 : retimer(NULL)
532         {
533         }
534
535         ~ModulePgSQL()
536         {
537                 delete retimer;
538                 ClearAllConnections();
539         }
540
541         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
542         {
543                 ReadConf();
544         }
545
546         void ReadConf()
547         {
548                 ConnMap conns;
549                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
550                 for(ConfigIter i = tags.first; i != tags.second; i++)
551                 {
552                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
553                                 continue;
554                         std::string id = i->second->getString("id");
555                         ConnMap::iterator curr = connections.find(id);
556                         if (curr == connections.end())
557                         {
558                                 SQLConn* conn = new SQLConn(this, i->second);
559                                 conns.insert(std::make_pair(id, conn));
560                                 ServerInstance->Modules->AddService(*conn);
561                         }
562                         else
563                         {
564                                 conns.insert(*curr);
565                                 connections.erase(curr);
566                         }
567                 }
568                 ClearAllConnections();
569                 conns.swap(connections);
570         }
571
572         void ClearAllConnections()
573         {
574                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
575                 {
576                         i->second->cull();
577                         delete i->second;
578                 }
579                 connections.clear();
580         }
581
582         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
583         {
584                 SQL::Error err(SQL::BAD_DBID);
585                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
586                 {
587                         SQLConn* conn = i->second;
588                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
589                         {
590                                 conn->qinprog.c->OnError(err);
591                                 delete conn->qinprog.c;
592                                 conn->qinprog.c = NULL;
593                         }
594                         std::deque<QueueItem>::iterator j = conn->queue.begin();
595                         while (j != conn->queue.end())
596                         {
597                                 SQL::Query* q = j->c;
598                                 if (q->creator == mod)
599                                 {
600                                         q->OnError(err);
601                                         delete q;
602                                         j = conn->queue.erase(j);
603                                 }
604                                 else
605                                         j++;
606                         }
607                 }
608         }
609
610         Version GetVersion() CXX11_OVERRIDE
611         {
612                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
613         }
614 };
615
616 bool ReconnectTimer::Tick(time_t time)
617 {
618         mod->retimer = NULL;
619         mod->ReadConf();
620         delete this;
621         return false;
622 }
623
624 void SQLConn::DelayReconnect()
625 {
626         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
627         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
628         if (it != mod->connections.end())
629         {
630                 mod->connections.erase(it);
631                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
632                 if (!mod->retimer)
633                 {
634                         mod->retimer = new ReconnectTimer(mod);
635                         ServerInstance->Timers.AddTimer(mod->retimer);
636                 }
637         }
638 }
639
640 MODULE_INIT(ModulePgSQL)