]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Change modules to use the MODNAME constant when logging.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include <cstdlib>
27 #include <sstream>
28 #include <libpq-fe.h>
29 #include "modules/sql.h"
30
31 /* $CompileFlags: -Iexec("pg_config --includedir") eval("my $s = `pg_config --version`;$s =~ /^.*?(\d+)\.(\d+)\.(\d+).*?$/;my $v = hex(sprintf("0x%02x%02x%02x", $1, $2, $3));print "-DPGSQL_HAS_ESCAPECONN" if(($v >= 0x080104) || ($v >= 0x07030F && $v < 0x070400) || ($v >= 0x07040D && $v < 0x080000) || ($v >= 0x080008 && $v < 0x080100));") */
32 /* $LinkerFlags: -Lexec("pg_config --libdir") -lpq */
33
34 /* SQLConn rewritten by peavey to
35  * use EventHandler instead of
36  * BufferedSocket. This is much neater
37  * and gives total control of destroy
38  * and delete of resources.
39  */
40
41 /* Forward declare, so we can have the typedef neatly at the top */
42 class SQLConn;
43 class ModulePgSQL;
44
45 typedef std::map<std::string, SQLConn*> ConnMap;
46
47 /* CREAD,       Connecting and wants read event
48  * CWRITE,      Connecting and wants write event
49  * WREAD,       Connected/Working and wants read event
50  * WWRITE,      Connected/Working and wants write event
51  * RREAD,       Resetting and wants read event
52  * RWRITE,      Resetting and wants write event
53  */
54 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
55
56 class ReconnectTimer : public Timer
57 {
58  private:
59         ModulePgSQL* mod;
60  public:
61         ReconnectTimer(ModulePgSQL* m) : Timer(5, ServerInstance->Time(), false), mod(m)
62         {
63         }
64         bool Tick(time_t TIME);
65 };
66
67 struct QueueItem
68 {
69         SQLQuery* c;
70         std::string q;
71         QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
72 };
73
74 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
75  * All SQL providers must create their own subclass and define it's methods using that
76  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
77  * of converting all data to a common format before it reaches the result structure. This way
78  * data is passes to the module nearly as directly as if it was using the API directly itself.
79  */
80
81 class PgSQLresult : public SQLResult
82 {
83         PGresult* res;
84         int currentrow;
85         int rows;
86  public:
87         PgSQLresult(PGresult* result) : res(result), currentrow(0)
88         {
89                 rows = PQntuples(res);
90                 if (!rows)
91                         rows = atoi(PQcmdTuples(res));
92         }
93
94         ~PgSQLresult()
95         {
96                 PQclear(res);
97         }
98
99         int Rows()
100         {
101                 return rows;
102         }
103
104         void GetCols(std::vector<std::string>& result)
105         {
106                 result.resize(PQnfields(res));
107                 for(unsigned int i=0; i < result.size(); i++)
108                 {
109                         result[i] = PQfname(res, i);
110                 }
111         }
112
113         SQLEntry GetValue(int row, int column)
114         {
115                 char* v = PQgetvalue(res, row, column);
116                 if (!v || PQgetisnull(res, row, column))
117                         return SQLEntry();
118
119                 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
120         }
121
122         bool GetRow(SQLEntries& result)
123         {
124                 if (currentrow >= PQntuples(res))
125                         return false;
126                 int ncols = PQnfields(res);
127
128                 for(int i = 0; i < ncols; i++)
129                 {
130                         result.push_back(GetValue(currentrow, i));
131                 }
132                 currentrow++;
133
134                 return true;
135         }
136 };
137
138 /** SQLConn represents one SQL session.
139  */
140 class SQLConn : public SQLProvider, public EventHandler
141 {
142  public:
143         reference<ConfigTag> conf;      /* The <database> entry */
144         std::deque<QueueItem> queue;
145         PGconn*                 sql;            /* PgSQL database connection handle */
146         SQLstatus               status;         /* PgSQL database connection status */
147         QueueItem               qinprog;        /* If there is currently a query in progress */
148
149         SQLConn(Module* Creator, ConfigTag* tag)
150         : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
151         {
152                 if (!DoConnect())
153                 {
154                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
155                         DelayReconnect();
156                 }
157         }
158
159         CullResult cull()
160         {
161                 this->SQLProvider::cull();
162                 ServerInstance->Modules->DelService(*this);
163                 return this->EventHandler::cull();
164         }
165
166         ~SQLConn()
167         {
168                 SQLerror err(SQL_BAD_DBID);
169                 if (qinprog.c)
170                 {
171                         qinprog.c->OnError(err);
172                         delete qinprog.c;
173                 }
174                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
175                 {
176                         SQLQuery* q = i->c;
177                         q->OnError(err);
178                         delete q;
179                 }
180         }
181
182         void HandleEvent(EventType et, int errornum)
183         {
184                 switch (et)
185                 {
186                         case EVENT_READ:
187                         case EVENT_WRITE:
188                                 DoEvent();
189                         break;
190
191                         case EVENT_ERROR:
192                                 DelayReconnect();
193                 }
194         }
195
196         std::string GetDSN()
197         {
198                 std::ostringstream conninfo("connect_timeout = '5'");
199                 std::string item;
200
201                 if (conf->readString("host", item))
202                         conninfo << " host = '" << item << "'";
203
204                 if (conf->readString("port", item))
205                         conninfo << " port = '" << item << "'";
206
207                 if (conf->readString("name", item))
208                         conninfo << " dbname = '" << item << "'";
209
210                 if (conf->readString("user", item))
211                         conninfo << " user = '" << item << "'";
212
213                 if (conf->readString("pass", item))
214                         conninfo << " password = '" << item << "'";
215
216                 if (conf->getBool("ssl"))
217                         conninfo << " sslmode = 'require'";
218                 else
219                         conninfo << " sslmode = 'disable'";
220
221                 return conninfo.str();
222         }
223
224         bool DoConnect()
225         {
226                 sql = PQconnectStart(GetDSN().c_str());
227                 if (!sql)
228                         return false;
229
230                 if(PQstatus(sql) == CONNECTION_BAD)
231                         return false;
232
233                 if(PQsetnonblocking(sql, 1) == -1)
234                         return false;
235
236                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
237                 * and then start polling it.
238                 */
239                 this->fd = PQsocket(sql);
240
241                 if(this->fd <= -1)
242                         return false;
243
244                 if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
245                 {
246                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
247                         return false;
248                 }
249
250                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
251                 return DoPoll();
252         }
253
254         bool DoPoll()
255         {
256                 switch(PQconnectPoll(sql))
257                 {
258                         case PGRES_POLLING_WRITING:
259                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
260                                 status = CWRITE;
261                                 return true;
262                         case PGRES_POLLING_READING:
263                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
264                                 status = CREAD;
265                                 return true;
266                         case PGRES_POLLING_FAILED:
267                                 return false;
268                         case PGRES_POLLING_OK:
269                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
270                                 status = WWRITE;
271                                 DoConnectedPoll();
272                         default:
273                                 return true;
274                 }
275         }
276
277         void DoConnectedPoll()
278         {
279 restart:
280                 while (qinprog.q.empty() && !queue.empty())
281                 {
282                         /* There's no query currently in progress, and there's queries in the queue. */
283                         DoQuery(queue.front());
284                         queue.pop_front();
285                 }
286
287                 if (PQconsumeInput(sql))
288                 {
289                         if (PQisBusy(sql))
290                         {
291                                 /* Nothing happens here */
292                         }
293                         else if (qinprog.c)
294                         {
295                                 /* Fetch the result.. */
296                                 PGresult* result = PQgetResult(sql);
297
298                                 /* PgSQL would allow a query string to be sent which has multiple
299                                  * queries in it, this isn't portable across database backends and
300                                  * we don't want modules doing it. But just in case we make sure we
301                                  * drain any results there are and just use the last one.
302                                  * If the module devs are behaving there will only be one result.
303                                  */
304                                 while (PGresult* temp = PQgetResult(sql))
305                                 {
306                                         PQclear(result);
307                                         result = temp;
308                                 }
309
310                                 /* ..and the result */
311                                 PgSQLresult reply(result);
312                                 switch(PQresultStatus(result))
313                                 {
314                                         case PGRES_EMPTY_QUERY:
315                                         case PGRES_BAD_RESPONSE:
316                                         case PGRES_FATAL_ERROR:
317                                         {
318                                                 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
319                                                 qinprog.c->OnError(err);
320                                                 break;
321                                         }
322                                         default:
323                                                 /* Other values are not errors */
324                                                 qinprog.c->OnResult(reply);
325                                 }
326
327                                 delete qinprog.c;
328                                 qinprog = QueueItem(NULL, "");
329                                 goto restart;
330                         }
331                         else
332                         {
333                                 qinprog.q.clear();
334                         }
335                 }
336                 else
337                 {
338                         /* I think we'll assume this means the server died...it might not,
339                          * but I think that any error serious enough we actually get here
340                          * deserves to reconnect [/excuse]
341                          * Returning true so the core doesn't try and close the connection.
342                          */
343                         DelayReconnect();
344                 }
345         }
346
347         bool DoResetPoll()
348         {
349                 switch(PQresetPoll(sql))
350                 {
351                         case PGRES_POLLING_WRITING:
352                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
353                                 status = CWRITE;
354                                 return DoPoll();
355                         case PGRES_POLLING_READING:
356                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
357                                 status = CREAD;
358                                 return true;
359                         case PGRES_POLLING_FAILED:
360                                 return false;
361                         case PGRES_POLLING_OK:
362                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
363                                 status = WWRITE;
364                                 DoConnectedPoll();
365                         default:
366                                 return true;
367                 }
368         }
369
370         void DelayReconnect();
371
372         void DoEvent()
373         {
374                 if((status == CREAD) || (status == CWRITE))
375                 {
376                         DoPoll();
377                 }
378                 else if((status == RREAD) || (status == RWRITE))
379                 {
380                         DoResetPoll();
381                 }
382                 else
383                 {
384                         DoConnectedPoll();
385                 }
386         }
387
388         void submit(SQLQuery *req, const std::string& q)
389         {
390                 if (qinprog.q.empty())
391                 {
392                         DoQuery(QueueItem(req,q));
393                 }
394                 else
395                 {
396                         // wait your turn.
397                         queue.push_back(QueueItem(req,q));
398                 }
399         }
400
401         void submit(SQLQuery *req, const std::string& q, const ParamL& p)
402         {
403                 std::string res;
404                 unsigned int param = 0;
405                 for(std::string::size_type i = 0; i < q.length(); i++)
406                 {
407                         if (q[i] != '?')
408                                 res.push_back(q[i]);
409                         else
410                         {
411                                 if (param < p.size())
412                                 {
413                                         std::string parm = p[param++];
414                                         std::vector<char> buffer(parm.length() * 2 + 1);
415 #ifdef PGSQL_HAS_ESCAPECONN
416                                         int error;
417                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
418                                         if (error)
419                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
420 #else
421                                         size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
422 #endif
423                                         res.append(&buffer[0], escapedsize);
424                                 }
425                         }
426                 }
427                 submit(req, res);
428         }
429
430         void submit(SQLQuery *req, const std::string& q, const ParamM& p)
431         {
432                 std::string res;
433                 for(std::string::size_type i = 0; i < q.length(); i++)
434                 {
435                         if (q[i] != '$')
436                                 res.push_back(q[i]);
437                         else
438                         {
439                                 std::string field;
440                                 i++;
441                                 while (i < q.length() && isalnum(q[i]))
442                                         field.push_back(q[i++]);
443                                 i--;
444
445                                 ParamM::const_iterator it = p.find(field);
446                                 if (it != p.end())
447                                 {
448                                         std::string parm = it->second;
449                                         std::vector<char> buffer(parm.length() * 2 + 1);
450 #ifdef PGSQL_HAS_ESCAPECONN
451                                         int error;
452                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
453                                         if (error)
454                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
455 #else
456                                         size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
457 #endif
458                                         res.append(&buffer[0], escapedsize);
459                                 }
460                         }
461                 }
462                 submit(req, res);
463         }
464
465         void DoQuery(const QueueItem& req)
466         {
467                 if (status != WREAD && status != WWRITE)
468                 {
469                         // whoops, not connected...
470                         SQLerror err(SQL_BAD_CONN);
471                         req.c->OnError(err);
472                         delete req.c;
473                         return;
474                 }
475
476                 if(PQsendQuery(sql, req.q.c_str()))
477                 {
478                         qinprog = req;
479                 }
480                 else
481                 {
482                         SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
483                         req.c->OnError(err);
484                         delete req.c;
485                 }
486         }
487
488         void Close()
489         {
490                 ServerInstance->SE->DelFd(this);
491
492                 if(sql)
493                 {
494                         PQfinish(sql);
495                         sql = NULL;
496                 }
497         }
498 };
499
500 class ModulePgSQL : public Module
501 {
502  public:
503         ConnMap connections;
504         ReconnectTimer* retimer;
505
506         ModulePgSQL()
507                 : retimer(NULL)
508         {
509         }
510
511         void init() CXX11_OVERRIDE
512         {
513                 ReadConf();
514
515                 Implementation eventlist[] = { I_OnUnloadModule, I_OnRehash };
516                 ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation));
517         }
518
519         ~ModulePgSQL()
520         {
521                 delete retimer;
522                 ClearAllConnections();
523         }
524
525         void OnRehash(User* user) CXX11_OVERRIDE
526         {
527                 ReadConf();
528         }
529
530         void ReadConf()
531         {
532                 ConnMap conns;
533                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
534                 for(ConfigIter i = tags.first; i != tags.second; i++)
535                 {
536                         if (i->second->getString("module", "pgsql") != "pgsql")
537                                 continue;
538                         std::string id = i->second->getString("id");
539                         ConnMap::iterator curr = connections.find(id);
540                         if (curr == connections.end())
541                         {
542                                 SQLConn* conn = new SQLConn(this, i->second);
543                                 conns.insert(std::make_pair(id, conn));
544                                 ServerInstance->Modules->AddService(*conn);
545                         }
546                         else
547                         {
548                                 conns.insert(*curr);
549                                 connections.erase(curr);
550                         }
551                 }
552                 ClearAllConnections();
553                 conns.swap(connections);
554         }
555
556         void ClearAllConnections()
557         {
558                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
559                 {
560                         i->second->cull();
561                         delete i->second;
562                 }
563                 connections.clear();
564         }
565
566         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
567         {
568                 SQLerror err(SQL_BAD_DBID);
569                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
570                 {
571                         SQLConn* conn = i->second;
572                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
573                         {
574                                 conn->qinprog.c->OnError(err);
575                                 delete conn->qinprog.c;
576                                 conn->qinprog.c = NULL;
577                         }
578                         std::deque<QueueItem>::iterator j = conn->queue.begin();
579                         while (j != conn->queue.end())
580                         {
581                                 SQLQuery* q = j->c;
582                                 if (q->creator == mod)
583                                 {
584                                         q->OnError(err);
585                                         delete q;
586                                         j = conn->queue.erase(j);
587                                 }
588                                 else
589                                         j++;
590                         }
591                 }
592         }
593
594         Version GetVersion() CXX11_OVERRIDE
595         {
596                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
597         }
598 };
599
600 bool ReconnectTimer::Tick(time_t time)
601 {
602         mod->retimer = NULL;
603         mod->ReadConf();
604         return false;
605 }
606
607 void SQLConn::DelayReconnect()
608 {
609         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
610         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
611         if (it != mod->connections.end())
612         {
613                 mod->connections.erase(it);
614                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
615                 if (!mod->retimer)
616                 {
617                         mod->retimer = new ReconnectTimer(mod);
618                         ServerInstance->Timers->AddTimer(mod->retimer);
619                 }
620         }
621 }
622
623 MODULE_INIT(ModulePgSQL)