]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Fix some remaining uses of ato[il].
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6  *   Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7  *   Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
26
27 /// $PackageInfo: require_system("arch") postgresql-libs
28 /// $PackageInfo: require_system("centos") postgresql-devel
29 /// $PackageInfo: require_system("darwin") postgresql
30 /// $PackageInfo: require_system("debian") libpq-dev
31 /// $PackageInfo: require_system("ubuntu") libpq-dev
32
33
34 #include "inspircd.h"
35 #include <cstdlib>
36 #include <libpq-fe.h>
37 #include "modules/sql.h"
38
39 /* SQLConn rewritten by peavey to
40  * use EventHandler instead of
41  * BufferedSocket. This is much neater
42  * and gives total control of destroy
43  * and delete of resources.
44  */
45
46 /* Forward declare, so we can have the typedef neatly at the top */
47 class SQLConn;
48 class ModulePgSQL;
49
50 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
51
52 /* CREAD,       Connecting and wants read event
53  * CWRITE,      Connecting and wants write event
54  * WREAD,       Connected/Working and wants read event
55  * WWRITE,      Connected/Working and wants write event
56  * RREAD,       Resetting and wants read event
57  * RWRITE,      Resetting and wants write event
58  */
59 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
60
61 class ReconnectTimer : public Timer
62 {
63  private:
64         ModulePgSQL* mod;
65  public:
66         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
67         {
68         }
69         bool Tick(time_t TIME) CXX11_OVERRIDE;
70 };
71
72 struct QueueItem
73 {
74         SQL::Query* c;
75         std::string q;
76         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
77 };
78
79 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
80  * All SQL providers must create their own subclass and define it's methods using that
81  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
82  * of converting all data to a common format before it reaches the result structure. This way
83  * data is passes to the module nearly as directly as if it was using the API directly itself.
84  */
85
86 class PgSQLresult : public SQL::Result
87 {
88         PGresult* res;
89         int currentrow;
90         int rows;
91         std::vector<std::string> colnames;
92
93         void getColNames()
94         {
95                 colnames.resize(PQnfields(res));
96                 for(unsigned int i=0; i < colnames.size(); i++)
97                 {
98                         colnames[i] = PQfname(res, i);
99                 }
100         }
101  public:
102         PgSQLresult(PGresult* result) : res(result), currentrow(0)
103         {
104                 rows = PQntuples(res);
105                 if (!rows)
106                         rows = ConvToNum<int>(PQcmdTuples(res));
107         }
108
109         ~PgSQLresult()
110         {
111                 PQclear(res);
112         }
113
114         int Rows() CXX11_OVERRIDE
115         {
116                 return rows;
117         }
118
119         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
120         {
121                 if (colnames.empty())
122                         getColNames();
123                 result = colnames;
124         }
125
126         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
127         {
128                 if (colnames.empty())
129                         getColNames();
130
131                 for (size_t i = 0; i < colnames.size(); ++i)
132                 {
133                         if (colnames[i] == column)
134                         {
135                                 index = i;
136                                 return true;
137                         }
138                 }
139                 return false;
140         }
141
142         SQL::Field GetValue(int row, int column)
143         {
144                 char* v = PQgetvalue(res, row, column);
145                 if (!v || PQgetisnull(res, row, column))
146                         return SQL::Field();
147
148                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
149         }
150
151         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
152         {
153                 if (currentrow >= PQntuples(res))
154                         return false;
155                 int ncols = PQnfields(res);
156
157                 for(int i = 0; i < ncols; i++)
158                 {
159                         result.push_back(GetValue(currentrow, i));
160                 }
161                 currentrow++;
162
163                 return true;
164         }
165 };
166
167 /** SQLConn represents one SQL session.
168  */
169 class SQLConn : public SQL::Provider, public EventHandler
170 {
171  public:
172         reference<ConfigTag> conf;      /* The <database> entry */
173         std::deque<QueueItem> queue;
174         PGconn*                 sql;            /* PgSQL database connection handle */
175         SQLstatus               status;         /* PgSQL database connection status */
176         QueueItem               qinprog;        /* If there is currently a query in progress */
177
178         SQLConn(Module* Creator, ConfigTag* tag)
179         : SQL::Provider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
180         {
181                 if (!DoConnect())
182                 {
183                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
184                         DelayReconnect();
185                 }
186         }
187
188         CullResult cull() CXX11_OVERRIDE
189         {
190                 this->SQL::Provider::cull();
191                 ServerInstance->Modules->DelService(*this);
192                 return this->EventHandler::cull();
193         }
194
195         ~SQLConn()
196         {
197                 SQL::Error err(SQL::BAD_DBID);
198                 if (qinprog.c)
199                 {
200                         qinprog.c->OnError(err);
201                         delete qinprog.c;
202                 }
203                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
204                 {
205                         SQL::Query* q = i->c;
206                         q->OnError(err);
207                         delete q;
208                 }
209         }
210
211         void OnEventHandlerRead() CXX11_OVERRIDE
212         {
213                 DoEvent();
214         }
215
216         void OnEventHandlerWrite() CXX11_OVERRIDE
217         {
218                 DoEvent();
219         }
220
221         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
222         {
223                 DelayReconnect();
224         }
225
226         std::string GetDSN()
227         {
228                 std::ostringstream conninfo("connect_timeout = '5'");
229                 std::string item;
230
231                 if (conf->readString("host", item))
232                         conninfo << " host = '" << item << "'";
233
234                 if (conf->readString("port", item))
235                         conninfo << " port = '" << item << "'";
236
237                 if (conf->readString("name", item))
238                         conninfo << " dbname = '" << item << "'";
239
240                 if (conf->readString("user", item))
241                         conninfo << " user = '" << item << "'";
242
243                 if (conf->readString("pass", item))
244                         conninfo << " password = '" << item << "'";
245
246                 if (conf->getBool("ssl"))
247                         conninfo << " sslmode = 'require'";
248                 else
249                         conninfo << " sslmode = 'disable'";
250
251                 return conninfo.str();
252         }
253
254         bool DoConnect()
255         {
256                 sql = PQconnectStart(GetDSN().c_str());
257                 if (!sql)
258                         return false;
259
260                 if(PQstatus(sql) == CONNECTION_BAD)
261                         return false;
262
263                 if(PQsetnonblocking(sql, 1) == -1)
264                         return false;
265
266                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
267                 * and then start polling it.
268                 */
269                 this->fd = PQsocket(sql);
270
271                 if(this->fd <= -1)
272                         return false;
273
274                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
275                 {
276                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
277                         return false;
278                 }
279
280                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
281                 return DoPoll();
282         }
283
284         bool DoPoll()
285         {
286                 switch(PQconnectPoll(sql))
287                 {
288                         case PGRES_POLLING_WRITING:
289                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
290                                 status = CWRITE;
291                                 return true;
292                         case PGRES_POLLING_READING:
293                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
294                                 status = CREAD;
295                                 return true;
296                         case PGRES_POLLING_FAILED:
297                                 return false;
298                         case PGRES_POLLING_OK:
299                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
300                                 status = WWRITE;
301                                 DoConnectedPoll();
302                                 return true;
303                         default:
304                                 return true;
305                 }
306         }
307
308         void DoConnectedPoll()
309         {
310 restart:
311                 while (qinprog.q.empty() && !queue.empty())
312                 {
313                         /* There's no query currently in progress, and there's queries in the queue. */
314                         DoQuery(queue.front());
315                         queue.pop_front();
316                 }
317
318                 if (PQconsumeInput(sql))
319                 {
320                         if (PQisBusy(sql))
321                         {
322                                 /* Nothing happens here */
323                         }
324                         else if (qinprog.c)
325                         {
326                                 /* Fetch the result.. */
327                                 PGresult* result = PQgetResult(sql);
328
329                                 /* PgSQL would allow a query string to be sent which has multiple
330                                  * queries in it, this isn't portable across database backends and
331                                  * we don't want modules doing it. But just in case we make sure we
332                                  * drain any results there are and just use the last one.
333                                  * If the module devs are behaving there will only be one result.
334                                  */
335                                 while (PGresult* temp = PQgetResult(sql))
336                                 {
337                                         PQclear(result);
338                                         result = temp;
339                                 }
340
341                                 /* ..and the result */
342                                 PgSQLresult reply(result);
343                                 switch(PQresultStatus(result))
344                                 {
345                                         case PGRES_EMPTY_QUERY:
346                                         case PGRES_BAD_RESPONSE:
347                                         case PGRES_FATAL_ERROR:
348                                         {
349                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
350                                                 qinprog.c->OnError(err);
351                                                 break;
352                                         }
353                                         default:
354                                                 /* Other values are not errors */
355                                                 qinprog.c->OnResult(reply);
356                                 }
357
358                                 delete qinprog.c;
359                                 qinprog = QueueItem(NULL, "");
360                                 goto restart;
361                         }
362                         else
363                         {
364                                 qinprog.q.clear();
365                         }
366                 }
367                 else
368                 {
369                         /* I think we'll assume this means the server died...it might not,
370                          * but I think that any error serious enough we actually get here
371                          * deserves to reconnect [/excuse]
372                          * Returning true so the core doesn't try and close the connection.
373                          */
374                         DelayReconnect();
375                 }
376         }
377
378         bool DoResetPoll()
379         {
380                 switch(PQresetPoll(sql))
381                 {
382                         case PGRES_POLLING_WRITING:
383                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
384                                 status = CWRITE;
385                                 return DoPoll();
386                         case PGRES_POLLING_READING:
387                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
388                                 status = CREAD;
389                                 return true;
390                         case PGRES_POLLING_FAILED:
391                                 return false;
392                         case PGRES_POLLING_OK:
393                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
394                                 status = WWRITE;
395                                 DoConnectedPoll();
396                                 return true;
397                         default:
398                                 return true;
399                 }
400         }
401
402         void DelayReconnect();
403
404         void DoEvent()
405         {
406                 if((status == CREAD) || (status == CWRITE))
407                 {
408                         DoPoll();
409                 }
410                 else if((status == RREAD) || (status == RWRITE))
411                 {
412                         DoResetPoll();
413                 }
414                 else
415                 {
416                         DoConnectedPoll();
417                 }
418         }
419
420         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
421         {
422                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
423                 if (qinprog.q.empty())
424                 {
425                         DoQuery(QueueItem(req,q));
426                 }
427                 else
428                 {
429                         // wait your turn.
430                         queue.push_back(QueueItem(req,q));
431                 }
432         }
433
434         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
435         {
436                 std::string res;
437                 unsigned int param = 0;
438                 for(std::string::size_type i = 0; i < q.length(); i++)
439                 {
440                         if (q[i] != '?')
441                                 res.push_back(q[i]);
442                         else
443                         {
444                                 if (param < p.size())
445                                 {
446                                         std::string parm = p[param++];
447                                         std::vector<char> buffer(parm.length() * 2 + 1);
448                                         int error;
449                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
450                                         if (error)
451                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
452                                         res.append(&buffer[0], escapedsize);
453                                 }
454                         }
455                 }
456                 Submit(req, res);
457         }
458
459         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
460         {
461                 std::string res;
462                 for(std::string::size_type i = 0; i < q.length(); i++)
463                 {
464                         if (q[i] != '$')
465                                 res.push_back(q[i]);
466                         else
467                         {
468                                 std::string field;
469                                 i++;
470                                 while (i < q.length() && isalnum(q[i]))
471                                         field.push_back(q[i++]);
472                                 i--;
473
474                                 SQL::ParamMap::const_iterator it = p.find(field);
475                                 if (it != p.end())
476                                 {
477                                         std::string parm = it->second;
478                                         std::vector<char> buffer(parm.length() * 2 + 1);
479                                         int error;
480                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
481                                         if (error)
482                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
483                                         res.append(&buffer[0], escapedsize);
484                                 }
485                         }
486                 }
487                 Submit(req, res);
488         }
489
490         void DoQuery(const QueueItem& req)
491         {
492                 if (status != WREAD && status != WWRITE)
493                 {
494                         // whoops, not connected...
495                         SQL::Error err(SQL::BAD_CONN);
496                         req.c->OnError(err);
497                         delete req.c;
498                         return;
499                 }
500
501                 if(PQsendQuery(sql, req.q.c_str()))
502                 {
503                         qinprog = req;
504                 }
505                 else
506                 {
507                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
508                         req.c->OnError(err);
509                         delete req.c;
510                 }
511         }
512
513         void Close()
514         {
515                 SocketEngine::DelFd(this);
516
517                 if(sql)
518                 {
519                         PQfinish(sql);
520                         sql = NULL;
521                 }
522         }
523 };
524
525 class ModulePgSQL : public Module
526 {
527  public:
528         ConnMap connections;
529         ReconnectTimer* retimer;
530
531         ModulePgSQL()
532                 : retimer(NULL)
533         {
534         }
535
536         ~ModulePgSQL()
537         {
538                 delete retimer;
539                 ClearAllConnections();
540         }
541
542         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
543         {
544                 ReadConf();
545         }
546
547         void ReadConf()
548         {
549                 ConnMap conns;
550                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
551                 for(ConfigIter i = tags.first; i != tags.second; i++)
552                 {
553                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
554                                 continue;
555                         std::string id = i->second->getString("id");
556                         ConnMap::iterator curr = connections.find(id);
557                         if (curr == connections.end())
558                         {
559                                 SQLConn* conn = new SQLConn(this, i->second);
560                                 conns.insert(std::make_pair(id, conn));
561                                 ServerInstance->Modules->AddService(*conn);
562                         }
563                         else
564                         {
565                                 conns.insert(*curr);
566                                 connections.erase(curr);
567                         }
568                 }
569                 ClearAllConnections();
570                 conns.swap(connections);
571         }
572
573         void ClearAllConnections()
574         {
575                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
576                 {
577                         i->second->cull();
578                         delete i->second;
579                 }
580                 connections.clear();
581         }
582
583         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
584         {
585                 SQL::Error err(SQL::BAD_DBID);
586                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
587                 {
588                         SQLConn* conn = i->second;
589                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
590                         {
591                                 conn->qinprog.c->OnError(err);
592                                 delete conn->qinprog.c;
593                                 conn->qinprog.c = NULL;
594                         }
595                         std::deque<QueueItem>::iterator j = conn->queue.begin();
596                         while (j != conn->queue.end())
597                         {
598                                 SQL::Query* q = j->c;
599                                 if (q->creator == mod)
600                                 {
601                                         q->OnError(err);
602                                         delete q;
603                                         j = conn->queue.erase(j);
604                                 }
605                                 else
606                                         j++;
607                         }
608                 }
609         }
610
611         Version GetVersion() CXX11_OVERRIDE
612         {
613                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
614         }
615 };
616
617 bool ReconnectTimer::Tick(time_t time)
618 {
619         mod->retimer = NULL;
620         mod->ReadConf();
621         delete this;
622         return false;
623 }
624
625 void SQLConn::DelayReconnect()
626 {
627         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
628         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
629         if (it != mod->connections.end())
630         {
631                 mod->connections.erase(it);
632                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
633                 if (!mod->retimer)
634                 {
635                         mod->retimer = new ReconnectTimer(mod);
636                         ServerInstance->Timers.AddTimer(mod->retimer);
637                 }
638         }
639 }
640
641 MODULE_INIT(ModulePgSQL)