]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Opt-out of pgsql read and write events if polling fails.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2016 Adam <Adam@anope.org>
5  *   Copyright (C) 2015 Daniel Vassdal <shutter@canternet.org>
6  *   Copyright (C) 2013, 2016-2019 Sadie Powell <sadie@witchery.services>
7  *   Copyright (C) 2012-2015 Attila Molnar <attilamolnar@hush.com>
8  *   Copyright (C) 2012 Robby <robby@chatbelgie.be>
9  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
10  *   Copyright (C) 2009 Uli Schlachter <psychon@inspircd.org>
11  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
12  *   Copyright (C) 2007, 2009-2010 Craig Edwards <brain@inspircd.org>
13  *   Copyright (C) 2007 Robin Burchell <robin+git@viroteck.net>
14  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
15  *   Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
16  *
17  * This file is part of InspIRCd.  InspIRCd is free software: you can
18  * redistribute it and/or modify it under the terms of the GNU General Public
19  * License as published by the Free Software Foundation, version 2.
20  *
21  * This program is distributed in the hope that it will be useful, but WITHOUT
22  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
24  * details.
25  *
26  * You should have received a copy of the GNU General Public License
27  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
28  */
29
30 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
31 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
32
33 /// $PackageInfo: require_system("arch") postgresql-libs
34 /// $PackageInfo: require_system("centos") postgresql-devel
35 /// $PackageInfo: require_system("darwin") postgresql
36 /// $PackageInfo: require_system("debian") libpq-dev
37 /// $PackageInfo: require_system("ubuntu") libpq-dev
38
39
40 #include "inspircd.h"
41 #include <cstdlib>
42 #include <libpq-fe.h>
43 #include "modules/sql.h"
44
45 /* SQLConn rewritten by peavey to
46  * use EventHandler instead of
47  * BufferedSocket. This is much neater
48  * and gives total control of destroy
49  * and delete of resources.
50  */
51
52 /* Forward declare, so we can have the typedef neatly at the top */
53 class SQLConn;
54 class ModulePgSQL;
55
56 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
57
58 enum SQLstatus
59 {
60         // The connection has died.
61         DEAD,
62
63         // Connecting and wants read event.
64         CREAD,
65
66         // Connecting and wants write event.
67         CWRITE,
68
69         // Connected/working and wants read event.
70         WREAD,
71
72         // Connected/working and wants write event.
73         WWRITE
74 };
75
76 class ReconnectTimer : public Timer
77 {
78  private:
79         ModulePgSQL* mod;
80  public:
81         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
82         {
83         }
84         bool Tick(time_t TIME) CXX11_OVERRIDE;
85 };
86
87 struct QueueItem
88 {
89         SQL::Query* c;
90         std::string q;
91         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
92 };
93
94 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
95  * All SQL providers must create their own subclass and define it's methods using that
96  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
97  * of converting all data to a common format before it reaches the result structure. This way
98  * data is passes to the module nearly as directly as if it was using the API directly itself.
99  */
100
101 class PgSQLresult : public SQL::Result
102 {
103         PGresult* res;
104         int currentrow;
105         int rows;
106         std::vector<std::string> colnames;
107
108         void getColNames()
109         {
110                 colnames.resize(PQnfields(res));
111                 for(unsigned int i=0; i < colnames.size(); i++)
112                 {
113                         colnames[i] = PQfname(res, i);
114                 }
115         }
116  public:
117         PgSQLresult(PGresult* result) : res(result), currentrow(0)
118         {
119                 rows = PQntuples(res);
120                 if (!rows)
121                         rows = ConvToNum<int>(PQcmdTuples(res));
122         }
123
124         ~PgSQLresult()
125         {
126                 PQclear(res);
127         }
128
129         int Rows() CXX11_OVERRIDE
130         {
131                 return rows;
132         }
133
134         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
135         {
136                 if (colnames.empty())
137                         getColNames();
138                 result = colnames;
139         }
140
141         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
142         {
143                 if (colnames.empty())
144                         getColNames();
145
146                 for (size_t i = 0; i < colnames.size(); ++i)
147                 {
148                         if (colnames[i] == column)
149                         {
150                                 index = i;
151                                 return true;
152                         }
153                 }
154                 return false;
155         }
156
157         SQL::Field GetValue(int row, int column)
158         {
159                 char* v = PQgetvalue(res, row, column);
160                 if (!v || PQgetisnull(res, row, column))
161                         return SQL::Field();
162
163                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
164         }
165
166         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
167         {
168                 if (currentrow >= PQntuples(res))
169                         return false;
170                 int ncols = PQnfields(res);
171
172                 for(int i = 0; i < ncols; i++)
173                 {
174                         result.push_back(GetValue(currentrow, i));
175                 }
176                 currentrow++;
177
178                 return true;
179         }
180 };
181
182 /** SQLConn represents one SQL session.
183  */
184 class SQLConn : public SQL::Provider, public EventHandler
185 {
186  public:
187         reference<ConfigTag> conf;      /* The <database> entry */
188         std::deque<QueueItem> queue;
189         PGconn*                 sql;            /* PgSQL database connection handle */
190         SQLstatus               status;         /* PgSQL database connection status */
191         QueueItem               qinprog;        /* If there is currently a query in progress */
192
193         SQLConn(Module* Creator, ConfigTag* tag)
194                 : SQL::Provider(Creator, tag->getString("id"))
195                 , conf(tag)
196                 , sql(NULL)
197                 , status(CWRITE)
198                 , qinprog(NULL, "")
199         {
200                 if (!DoConnect())
201                 {
202                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
203                         DelayReconnect();
204                 }
205         }
206
207         CullResult cull() CXX11_OVERRIDE
208         {
209                 this->SQL::Provider::cull();
210                 ServerInstance->Modules->DelService(*this);
211                 return this->EventHandler::cull();
212         }
213
214         ~SQLConn()
215         {
216                 SQL::Error err(SQL::BAD_DBID);
217                 if (qinprog.c)
218                 {
219                         qinprog.c->OnError(err);
220                         delete qinprog.c;
221                 }
222                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
223                 {
224                         SQL::Query* q = i->c;
225                         q->OnError(err);
226                         delete q;
227                 }
228         }
229
230         void OnEventHandlerRead() CXX11_OVERRIDE
231         {
232                 DoEvent();
233         }
234
235         void OnEventHandlerWrite() CXX11_OVERRIDE
236         {
237                 DoEvent();
238         }
239
240         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
241         {
242                 DelayReconnect();
243         }
244
245         std::string GetDSN()
246         {
247                 std::ostringstream conninfo("connect_timeout = '5'");
248                 std::string item;
249
250                 if (conf->readString("host", item))
251                         conninfo << " host = '" << item << "'";
252
253                 if (conf->readString("port", item))
254                         conninfo << " port = '" << item << "'";
255
256                 if (conf->readString("name", item))
257                         conninfo << " dbname = '" << item << "'";
258
259                 if (conf->readString("user", item))
260                         conninfo << " user = '" << item << "'";
261
262                 if (conf->readString("pass", item))
263                         conninfo << " password = '" << item << "'";
264
265                 if (conf->getBool("ssl"))
266                         conninfo << " sslmode = 'require'";
267                 else
268                         conninfo << " sslmode = 'disable'";
269
270                 return conninfo.str();
271         }
272
273         bool DoConnect()
274         {
275                 sql = PQconnectStart(GetDSN().c_str());
276                 if (!sql)
277                         return false;
278
279                 if(PQstatus(sql) == CONNECTION_BAD)
280                         return false;
281
282                 if(PQsetnonblocking(sql, 1) == -1)
283                         return false;
284
285                 /* OK, we've initialised the connection, now to get it hooked into the socket engine
286                 * and then start polling it.
287                 */
288                 this->fd = PQsocket(sql);
289
290                 if(this->fd <= -1)
291                         return false;
292
293                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
294                 {
295                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
296                         return false;
297                 }
298
299                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
300                 return DoPoll();
301         }
302
303         bool DoPoll()
304         {
305                 switch(PQconnectPoll(sql))
306                 {
307                         case PGRES_POLLING_WRITING:
308                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
309                                 status = CWRITE;
310                                 return true;
311                         case PGRES_POLLING_READING:
312                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
313                                 status = CREAD;
314                                 return true;
315                         case PGRES_POLLING_FAILED:
316                                 SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
317                                 status = DEAD;
318                                 return false;
319                         case PGRES_POLLING_OK:
320                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
321                                 status = WWRITE;
322                                 DoConnectedPoll();
323                                 return true;
324                         default:
325                                 return true;
326                 }
327         }
328
329         void DoConnectedPoll()
330         {
331 restart:
332                 while (qinprog.q.empty() && !queue.empty())
333                 {
334                         /* There's no query currently in progress, and there's queries in the queue. */
335                         DoQuery(queue.front());
336                         queue.pop_front();
337                 }
338
339                 if (PQconsumeInput(sql))
340                 {
341                         if (PQisBusy(sql))
342                         {
343                                 /* Nothing happens here */
344                         }
345                         else if (qinprog.c)
346                         {
347                                 /* Fetch the result.. */
348                                 PGresult* result = PQgetResult(sql);
349
350                                 /* PgSQL would allow a query string to be sent which has multiple
351                                  * queries in it, this isn't portable across database backends and
352                                  * we don't want modules doing it. But just in case we make sure we
353                                  * drain any results there are and just use the last one.
354                                  * If the module devs are behaving there will only be one result.
355                                  */
356                                 while (PGresult* temp = PQgetResult(sql))
357                                 {
358                                         PQclear(result);
359                                         result = temp;
360                                 }
361
362                                 /* ..and the result */
363                                 PgSQLresult reply(result);
364                                 switch(PQresultStatus(result))
365                                 {
366                                         case PGRES_EMPTY_QUERY:
367                                         case PGRES_BAD_RESPONSE:
368                                         case PGRES_FATAL_ERROR:
369                                         {
370                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
371                                                 qinprog.c->OnError(err);
372                                                 break;
373                                         }
374                                         default:
375                                                 /* Other values are not errors */
376                                                 qinprog.c->OnResult(reply);
377                                 }
378
379                                 delete qinprog.c;
380                                 qinprog = QueueItem(NULL, "");
381                                 goto restart;
382                         }
383                         else
384                         {
385                                 qinprog.q.clear();
386                         }
387                 }
388                 else
389                 {
390                         /* I think we'll assume this means the server died...it might not,
391                          * but I think that any error serious enough we actually get here
392                          * deserves to reconnect [/excuse]
393                          * Returning true so the core doesn't try and close the connection.
394                          */
395                         DelayReconnect();
396                 }
397         }
398
399         void DelayReconnect();
400
401         void DoEvent()
402         {
403                 if((status == CREAD) || (status == CWRITE))
404                 {
405                         DoPoll();
406                 }
407                 else if (status == WREAD || status == WWRITE)
408                 {
409                         DoConnectedPoll();
410                 }
411         }
412
413         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
414         {
415                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
416                 if (qinprog.q.empty())
417                 {
418                         DoQuery(QueueItem(req,q));
419                 }
420                 else
421                 {
422                         // wait your turn.
423                         queue.push_back(QueueItem(req,q));
424                 }
425         }
426
427         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
428         {
429                 std::string res;
430                 unsigned int param = 0;
431                 for(std::string::size_type i = 0; i < q.length(); i++)
432                 {
433                         if (q[i] != '?')
434                                 res.push_back(q[i]);
435                         else
436                         {
437                                 if (param < p.size())
438                                 {
439                                         std::string parm = p[param++];
440                                         std::vector<char> buffer(parm.length() * 2 + 1);
441                                         int error;
442                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
443                                         if (error)
444                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
445                                         res.append(&buffer[0], escapedsize);
446                                 }
447                         }
448                 }
449                 Submit(req, res);
450         }
451
452         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
453         {
454                 std::string res;
455                 for(std::string::size_type i = 0; i < q.length(); i++)
456                 {
457                         if (q[i] != '$')
458                                 res.push_back(q[i]);
459                         else
460                         {
461                                 std::string field;
462                                 i++;
463                                 while (i < q.length() && isalnum(q[i]))
464                                         field.push_back(q[i++]);
465                                 i--;
466
467                                 SQL::ParamMap::const_iterator it = p.find(field);
468                                 if (it != p.end())
469                                 {
470                                         std::string parm = it->second;
471                                         std::vector<char> buffer(parm.length() * 2 + 1);
472                                         int error;
473                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
474                                         if (error)
475                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
476                                         res.append(&buffer[0], escapedsize);
477                                 }
478                         }
479                 }
480                 Submit(req, res);
481         }
482
483         void DoQuery(const QueueItem& req)
484         {
485                 if (status != WREAD && status != WWRITE)
486                 {
487                         // whoops, not connected...
488                         SQL::Error err(SQL::BAD_CONN);
489                         req.c->OnError(err);
490                         delete req.c;
491                         return;
492                 }
493
494                 if(PQsendQuery(sql, req.q.c_str()))
495                 {
496                         qinprog = req;
497                 }
498                 else
499                 {
500                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
501                         req.c->OnError(err);
502                         delete req.c;
503                 }
504         }
505
506         void Close()
507         {
508                 status = DEAD;
509                 SocketEngine::DelFd(this);
510
511                 if(sql)
512                 {
513                         PQfinish(sql);
514                         sql = NULL;
515                 }
516         }
517 };
518
519 class ModulePgSQL : public Module
520 {
521  public:
522         ConnMap connections;
523         ReconnectTimer* retimer;
524
525         ModulePgSQL()
526                 : retimer(NULL)
527         {
528         }
529
530         ~ModulePgSQL()
531         {
532                 delete retimer;
533                 ClearAllConnections();
534         }
535
536         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
537         {
538                 ReadConf();
539         }
540
541         void ReadConf()
542         {
543                 ConnMap conns;
544                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
545                 for(ConfigIter i = tags.first; i != tags.second; i++)
546                 {
547                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
548                                 continue;
549                         std::string id = i->second->getString("id");
550                         ConnMap::iterator curr = connections.find(id);
551                         if (curr == connections.end())
552                         {
553                                 SQLConn* conn = new SQLConn(this, i->second);
554                                 conns.insert(std::make_pair(id, conn));
555                                 ServerInstance->Modules->AddService(*conn);
556                         }
557                         else
558                         {
559                                 conns.insert(*curr);
560                                 connections.erase(curr);
561                         }
562                 }
563                 ClearAllConnections();
564                 conns.swap(connections);
565         }
566
567         void ClearAllConnections()
568         {
569                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
570                 {
571                         i->second->cull();
572                         delete i->second;
573                 }
574                 connections.clear();
575         }
576
577         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
578         {
579                 SQL::Error err(SQL::BAD_DBID);
580                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
581                 {
582                         SQLConn* conn = i->second;
583                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
584                         {
585                                 conn->qinprog.c->OnError(err);
586                                 delete conn->qinprog.c;
587                                 conn->qinprog.c = NULL;
588                         }
589                         std::deque<QueueItem>::iterator j = conn->queue.begin();
590                         while (j != conn->queue.end())
591                         {
592                                 SQL::Query* q = j->c;
593                                 if (q->creator == mod)
594                                 {
595                                         q->OnError(err);
596                                         delete q;
597                                         j = conn->queue.erase(j);
598                                 }
599                                 else
600                                         j++;
601                         }
602                 }
603         }
604
605         Version GetVersion() CXX11_OVERRIDE
606         {
607                 return Version("Provides the ability for SQL modules to query a PostgreSQL database.", VF_VENDOR);
608         }
609 };
610
611 bool ReconnectTimer::Tick(time_t time)
612 {
613         mod->retimer = NULL;
614         mod->ReadConf();
615         delete this;
616         return false;
617 }
618
619 void SQLConn::DelayReconnect()
620 {
621         status = DEAD;
622         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
623         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
624         if (it != mod->connections.end())
625         {
626                 mod->connections.erase(it);
627                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
628                 if (!mod->retimer)
629                 {
630                         mod->retimer = new ReconnectTimer(mod);
631                         ServerInstance->Timers.AddTimer(mod->retimer);
632                 }
633         }
634 }
635
636 MODULE_INIT(ModulePgSQL)