]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Rewrite the build system directive parser.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
26
27 /// $PackageInfo: require_system("darwin") postgresql
28 /// $PackageInfo: require_system("ubuntu") libpq-dev
29
30
31 #include "inspircd.h"
32 #include <cstdlib>
33 #include <libpq-fe.h>
34 #include "modules/sql.h"
35
36 /* SQLConn rewritten by peavey to
37  * use EventHandler instead of
38  * BufferedSocket. This is much neater
39  * and gives total control of destroy
40  * and delete of resources.
41  */
42
43 /* Forward declare, so we can have the typedef neatly at the top */
44 class SQLConn;
45 class ModulePgSQL;
46
47 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
48
49 /* CREAD,       Connecting and wants read event
50  * CWRITE,      Connecting and wants write event
51  * WREAD,       Connected/Working and wants read event
52  * WWRITE,      Connected/Working and wants write event
53  * RREAD,       Resetting and wants read event
54  * RWRITE,      Resetting and wants write event
55  */
56 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
57
58 class ReconnectTimer : public Timer
59 {
60  private:
61         ModulePgSQL* mod;
62  public:
63         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
64         {
65         }
66         bool Tick(time_t TIME);
67 };
68
69 struct QueueItem
70 {
71         SQLQuery* c;
72         std::string q;
73         QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
74 };
75
76 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
77  * All SQL providers must create their own subclass and define it's methods using that
78  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
79  * of converting all data to a common format before it reaches the result structure. This way
80  * data is passes to the module nearly as directly as if it was using the API directly itself.
81  */
82
83 class PgSQLresult : public SQLResult
84 {
85         PGresult* res;
86         int currentrow;
87         int rows;
88  public:
89         PgSQLresult(PGresult* result) : res(result), currentrow(0)
90         {
91                 rows = PQntuples(res);
92                 if (!rows)
93                         rows = atoi(PQcmdTuples(res));
94         }
95
96         ~PgSQLresult()
97         {
98                 PQclear(res);
99         }
100
101         int Rows()
102         {
103                 return rows;
104         }
105
106         void GetCols(std::vector<std::string>& result)
107         {
108                 result.resize(PQnfields(res));
109                 for(unsigned int i=0; i < result.size(); i++)
110                 {
111                         result[i] = PQfname(res, i);
112                 }
113         }
114
115         SQLEntry GetValue(int row, int column)
116         {
117                 char* v = PQgetvalue(res, row, column);
118                 if (!v || PQgetisnull(res, row, column))
119                         return SQLEntry();
120
121                 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
122         }
123
124         bool GetRow(SQLEntries& result)
125         {
126                 if (currentrow >= PQntuples(res))
127                         return false;
128                 int ncols = PQnfields(res);
129
130                 for(int i = 0; i < ncols; i++)
131                 {
132                         result.push_back(GetValue(currentrow, i));
133                 }
134                 currentrow++;
135
136                 return true;
137         }
138 };
139
140 /** SQLConn represents one SQL session.
141  */
142 class SQLConn : public SQLProvider, public EventHandler
143 {
144  public:
145         reference<ConfigTag> conf;      /* The <database> entry */
146         std::deque<QueueItem> queue;
147         PGconn*                 sql;            /* PgSQL database connection handle */
148         SQLstatus               status;         /* PgSQL database connection status */
149         QueueItem               qinprog;        /* If there is currently a query in progress */
150
151         SQLConn(Module* Creator, ConfigTag* tag)
152         : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
153         {
154                 if (!DoConnect())
155                 {
156                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
157                         DelayReconnect();
158                 }
159         }
160
161         CullResult cull()
162         {
163                 this->SQLProvider::cull();
164                 ServerInstance->Modules->DelService(*this);
165                 return this->EventHandler::cull();
166         }
167
168         ~SQLConn()
169         {
170                 SQLerror err(SQL_BAD_DBID);
171                 if (qinprog.c)
172                 {
173                         qinprog.c->OnError(err);
174                         delete qinprog.c;
175                 }
176                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
177                 {
178                         SQLQuery* q = i->c;
179                         q->OnError(err);
180                         delete q;
181                 }
182         }
183
184         void OnEventHandlerRead() CXX11_OVERRIDE
185         {
186                 DoEvent();
187         }
188
189         void OnEventHandlerWrite() CXX11_OVERRIDE
190         {
191                 DoEvent();
192         }
193
194         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
195         {
196                 DelayReconnect();
197         }
198
199         std::string GetDSN()
200         {
201                 std::ostringstream conninfo("connect_timeout = '5'");
202                 std::string item;
203
204                 if (conf->readString("host", item))
205                         conninfo << " host = '" << item << "'";
206
207                 if (conf->readString("port", item))
208                         conninfo << " port = '" << item << "'";
209
210                 if (conf->readString("name", item))
211                         conninfo << " dbname = '" << item << "'";
212
213                 if (conf->readString("user", item))
214                         conninfo << " user = '" << item << "'";
215
216                 if (conf->readString("pass", item))
217                         conninfo << " password = '" << item << "'";
218
219                 if (conf->getBool("ssl"))
220                         conninfo << " sslmode = 'require'";
221                 else
222                         conninfo << " sslmode = 'disable'";
223
224                 return conninfo.str();
225         }
226
227         bool DoConnect()
228         {
229                 sql = PQconnectStart(GetDSN().c_str());
230                 if (!sql)
231                         return false;
232
233                 if(PQstatus(sql) == CONNECTION_BAD)
234                         return false;
235
236                 if(PQsetnonblocking(sql, 1) == -1)
237                         return false;
238
239                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
240                 * and then start polling it.
241                 */
242                 this->fd = PQsocket(sql);
243
244                 if(this->fd <= -1)
245                         return false;
246
247                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
248                 {
249                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
250                         return false;
251                 }
252
253                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
254                 return DoPoll();
255         }
256
257         bool DoPoll()
258         {
259                 switch(PQconnectPoll(sql))
260                 {
261                         case PGRES_POLLING_WRITING:
262                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
263                                 status = CWRITE;
264                                 return true;
265                         case PGRES_POLLING_READING:
266                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
267                                 status = CREAD;
268                                 return true;
269                         case PGRES_POLLING_FAILED:
270                                 return false;
271                         case PGRES_POLLING_OK:
272                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
273                                 status = WWRITE;
274                                 DoConnectedPoll();
275                         default:
276                                 return true;
277                 }
278         }
279
280         void DoConnectedPoll()
281         {
282 restart:
283                 while (qinprog.q.empty() && !queue.empty())
284                 {
285                         /* There's no query currently in progress, and there's queries in the queue. */
286                         DoQuery(queue.front());
287                         queue.pop_front();
288                 }
289
290                 if (PQconsumeInput(sql))
291                 {
292                         if (PQisBusy(sql))
293                         {
294                                 /* Nothing happens here */
295                         }
296                         else if (qinprog.c)
297                         {
298                                 /* Fetch the result.. */
299                                 PGresult* result = PQgetResult(sql);
300
301                                 /* PgSQL would allow a query string to be sent which has multiple
302                                  * queries in it, this isn't portable across database backends and
303                                  * we don't want modules doing it. But just in case we make sure we
304                                  * drain any results there are and just use the last one.
305                                  * If the module devs are behaving there will only be one result.
306                                  */
307                                 while (PGresult* temp = PQgetResult(sql))
308                                 {
309                                         PQclear(result);
310                                         result = temp;
311                                 }
312
313                                 /* ..and the result */
314                                 PgSQLresult reply(result);
315                                 switch(PQresultStatus(result))
316                                 {
317                                         case PGRES_EMPTY_QUERY:
318                                         case PGRES_BAD_RESPONSE:
319                                         case PGRES_FATAL_ERROR:
320                                         {
321                                                 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
322                                                 qinprog.c->OnError(err);
323                                                 break;
324                                         }
325                                         default:
326                                                 /* Other values are not errors */
327                                                 qinprog.c->OnResult(reply);
328                                 }
329
330                                 delete qinprog.c;
331                                 qinprog = QueueItem(NULL, "");
332                                 goto restart;
333                         }
334                         else
335                         {
336                                 qinprog.q.clear();
337                         }
338                 }
339                 else
340                 {
341                         /* I think we'll assume this means the server died...it might not,
342                          * but I think that any error serious enough we actually get here
343                          * deserves to reconnect [/excuse]
344                          * Returning true so the core doesn't try and close the connection.
345                          */
346                         DelayReconnect();
347                 }
348         }
349
350         bool DoResetPoll()
351         {
352                 switch(PQresetPoll(sql))
353                 {
354                         case PGRES_POLLING_WRITING:
355                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
356                                 status = CWRITE;
357                                 return DoPoll();
358                         case PGRES_POLLING_READING:
359                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
360                                 status = CREAD;
361                                 return true;
362                         case PGRES_POLLING_FAILED:
363                                 return false;
364                         case PGRES_POLLING_OK:
365                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
366                                 status = WWRITE;
367                                 DoConnectedPoll();
368                         default:
369                                 return true;
370                 }
371         }
372
373         void DelayReconnect();
374
375         void DoEvent()
376         {
377                 if((status == CREAD) || (status == CWRITE))
378                 {
379                         DoPoll();
380                 }
381                 else if((status == RREAD) || (status == RWRITE))
382                 {
383                         DoResetPoll();
384                 }
385                 else
386                 {
387                         DoConnectedPoll();
388                 }
389         }
390
391         void submit(SQLQuery *req, const std::string& q)
392         {
393                 if (qinprog.q.empty())
394                 {
395                         DoQuery(QueueItem(req,q));
396                 }
397                 else
398                 {
399                         // wait your turn.
400                         queue.push_back(QueueItem(req,q));
401                 }
402         }
403
404         void submit(SQLQuery *req, const std::string& q, const ParamL& p)
405         {
406                 std::string res;
407                 unsigned int param = 0;
408                 for(std::string::size_type i = 0; i < q.length(); i++)
409                 {
410                         if (q[i] != '?')
411                                 res.push_back(q[i]);
412                         else
413                         {
414                                 if (param < p.size())
415                                 {
416                                         std::string parm = p[param++];
417                                         std::vector<char> buffer(parm.length() * 2 + 1);
418                                         int error;
419                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
420                                         if (error)
421                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
422                                         res.append(&buffer[0], escapedsize);
423                                 }
424                         }
425                 }
426                 submit(req, res);
427         }
428
429         void submit(SQLQuery *req, const std::string& q, const ParamM& p)
430         {
431                 std::string res;
432                 for(std::string::size_type i = 0; i < q.length(); i++)
433                 {
434                         if (q[i] != '$')
435                                 res.push_back(q[i]);
436                         else
437                         {
438                                 std::string field;
439                                 i++;
440                                 while (i < q.length() && isalnum(q[i]))
441                                         field.push_back(q[i++]);
442                                 i--;
443
444                                 ParamM::const_iterator it = p.find(field);
445                                 if (it != p.end())
446                                 {
447                                         std::string parm = it->second;
448                                         std::vector<char> buffer(parm.length() * 2 + 1);
449                                         int error;
450                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
451                                         if (error)
452                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
453                                         res.append(&buffer[0], escapedsize);
454                                 }
455                         }
456                 }
457                 submit(req, res);
458         }
459
460         void DoQuery(const QueueItem& req)
461         {
462                 if (status != WREAD && status != WWRITE)
463                 {
464                         // whoops, not connected...
465                         SQLerror err(SQL_BAD_CONN);
466                         req.c->OnError(err);
467                         delete req.c;
468                         return;
469                 }
470
471                 if(PQsendQuery(sql, req.q.c_str()))
472                 {
473                         qinprog = req;
474                 }
475                 else
476                 {
477                         SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
478                         req.c->OnError(err);
479                         delete req.c;
480                 }
481         }
482
483         void Close()
484         {
485                 SocketEngine::DelFd(this);
486
487                 if(sql)
488                 {
489                         PQfinish(sql);
490                         sql = NULL;
491                 }
492         }
493 };
494
495 class ModulePgSQL : public Module
496 {
497  public:
498         ConnMap connections;
499         ReconnectTimer* retimer;
500
501         ModulePgSQL()
502                 : retimer(NULL)
503         {
504         }
505
506         ~ModulePgSQL()
507         {
508                 delete retimer;
509                 ClearAllConnections();
510         }
511
512         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
513         {
514                 ReadConf();
515         }
516
517         void ReadConf()
518         {
519                 ConnMap conns;
520                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
521                 for(ConfigIter i = tags.first; i != tags.second; i++)
522                 {
523                         if (i->second->getString("module", "pgsql") != "pgsql")
524                                 continue;
525                         std::string id = i->second->getString("id");
526                         ConnMap::iterator curr = connections.find(id);
527                         if (curr == connections.end())
528                         {
529                                 SQLConn* conn = new SQLConn(this, i->second);
530                                 conns.insert(std::make_pair(id, conn));
531                                 ServerInstance->Modules->AddService(*conn);
532                         }
533                         else
534                         {
535                                 conns.insert(*curr);
536                                 connections.erase(curr);
537                         }
538                 }
539                 ClearAllConnections();
540                 conns.swap(connections);
541         }
542
543         void ClearAllConnections()
544         {
545                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
546                 {
547                         i->second->cull();
548                         delete i->second;
549                 }
550                 connections.clear();
551         }
552
553         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
554         {
555                 SQLerror err(SQL_BAD_DBID);
556                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
557                 {
558                         SQLConn* conn = i->second;
559                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
560                         {
561                                 conn->qinprog.c->OnError(err);
562                                 delete conn->qinprog.c;
563                                 conn->qinprog.c = NULL;
564                         }
565                         std::deque<QueueItem>::iterator j = conn->queue.begin();
566                         while (j != conn->queue.end())
567                         {
568                                 SQLQuery* q = j->c;
569                                 if (q->creator == mod)
570                                 {
571                                         q->OnError(err);
572                                         delete q;
573                                         j = conn->queue.erase(j);
574                                 }
575                                 else
576                                         j++;
577                         }
578                 }
579         }
580
581         Version GetVersion() CXX11_OVERRIDE
582         {
583                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
584         }
585 };
586
587 bool ReconnectTimer::Tick(time_t time)
588 {
589         mod->retimer = NULL;
590         mod->ReadConf();
591         delete this;
592         return false;
593 }
594
595 void SQLConn::DelayReconnect()
596 {
597         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
598         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
599         if (it != mod->connections.end())
600         {
601                 mod->connections.erase(it);
602                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
603                 if (!mod->retimer)
604                 {
605                         mod->retimer = new ReconnectTimer(mod);
606                         ServerInstance->Timers.AddTimer(mod->retimer);
607                 }
608         }
609 }
610
611 MODULE_INIT(ModulePgSQL)