]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
2645617a5b7ac5088dd66de0db1514a84375dcc9
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2016 Adam <Adam@anope.org>
5  *   Copyright (C) 2015 Daniel Vassdal <shutter@canternet.org>
6  *   Copyright (C) 2013, 2016-2019 Sadie Powell <sadie@witchery.services>
7  *   Copyright (C) 2012-2015 Attila Molnar <attilamolnar@hush.com>
8  *   Copyright (C) 2012 Robby <robby@chatbelgie.be>
9  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
10  *   Copyright (C) 2009 Uli Schlachter <psychon@inspircd.org>
11  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
12  *   Copyright (C) 2007, 2009-2010 Craig Edwards <brain@inspircd.org>
13  *   Copyright (C) 2007 Robin Burchell <robin+git@viroteck.net>
14  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
15  *   Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
16  *
17  * This file is part of InspIRCd.  InspIRCd is free software: you can
18  * redistribute it and/or modify it under the terms of the GNU General Public
19  * License as published by the Free Software Foundation, version 2.
20  *
21  * This program is distributed in the hope that it will be useful, but WITHOUT
22  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
24  * details.
25  *
26  * You should have received a copy of the GNU General Public License
27  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
28  */
29
30 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
31 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
32
33 /// $PackageInfo: require_system("arch") postgresql-libs
34 /// $PackageInfo: require_system("centos") postgresql-devel
35 /// $PackageInfo: require_system("darwin") postgresql
36 /// $PackageInfo: require_system("debian") libpq-dev
37 /// $PackageInfo: require_system("ubuntu") libpq-dev
38
39
40 #include "inspircd.h"
41 #include <cstdlib>
42 #include <libpq-fe.h>
43 #include "modules/sql.h"
44
45 /* SQLConn rewritten by peavey to
46  * use EventHandler instead of
47  * BufferedSocket. This is much neater
48  * and gives total control of destroy
49  * and delete of resources.
50  */
51
52 /* Forward declare, so we can have the typedef neatly at the top */
53 class SQLConn;
54 class ModulePgSQL;
55
56 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
57
58 enum SQLstatus
59 {
60         // The connection has died.
61         DEAD,
62
63         // Connecting and wants read event.
64         CREAD,
65
66         // Connecting and wants write event.
67         CWRITE,
68
69         // Connected/working and wants read event.
70         WREAD,
71
72         // Connected/working and wants write event.
73         WWRITE
74 };
75
76 class ReconnectTimer : public Timer
77 {
78  private:
79         ModulePgSQL* mod;
80  public:
81         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
82         {
83         }
84         bool Tick(time_t TIME) CXX11_OVERRIDE;
85 };
86
87 struct QueueItem
88 {
89         SQL::Query* c;
90         std::string q;
91         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
92 };
93
94 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
95  * All SQL providers must create their own subclass and define it's methods using that
96  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
97  * of converting all data to a common format before it reaches the result structure. This way
98  * data is passes to the module nearly as directly as if it was using the API directly itself.
99  */
100
101 class PgSQLresult : public SQL::Result
102 {
103         PGresult* res;
104         int currentrow;
105         int rows;
106         std::vector<std::string> colnames;
107
108         void getColNames()
109         {
110                 colnames.resize(PQnfields(res));
111                 for(unsigned int i=0; i < colnames.size(); i++)
112                 {
113                         colnames[i] = PQfname(res, i);
114                 }
115         }
116  public:
117         PgSQLresult(PGresult* result) : res(result), currentrow(0)
118         {
119                 rows = PQntuples(res);
120                 if (!rows)
121                         rows = ConvToNum<int>(PQcmdTuples(res));
122         }
123
124         ~PgSQLresult()
125         {
126                 PQclear(res);
127         }
128
129         int Rows() CXX11_OVERRIDE
130         {
131                 return rows;
132         }
133
134         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
135         {
136                 if (colnames.empty())
137                         getColNames();
138                 result = colnames;
139         }
140
141         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
142         {
143                 if (colnames.empty())
144                         getColNames();
145
146                 for (size_t i = 0; i < colnames.size(); ++i)
147                 {
148                         if (colnames[i] == column)
149                         {
150                                 index = i;
151                                 return true;
152                         }
153                 }
154                 return false;
155         }
156
157         SQL::Field GetValue(int row, int column)
158         {
159                 char* v = PQgetvalue(res, row, column);
160                 if (!v || PQgetisnull(res, row, column))
161                         return SQL::Field();
162
163                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
164         }
165
166         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
167         {
168                 if (currentrow >= PQntuples(res))
169                         return false;
170                 int ncols = PQnfields(res);
171
172                 for(int i = 0; i < ncols; i++)
173                 {
174                         result.push_back(GetValue(currentrow, i));
175                 }
176                 currentrow++;
177
178                 return true;
179         }
180 };
181
182 /** SQLConn represents one SQL session.
183  */
184 class SQLConn : public SQL::Provider, public EventHandler
185 {
186  public:
187         reference<ConfigTag> conf;      /* The <database> entry */
188         std::deque<QueueItem> queue;
189         PGconn*                 sql;            /* PgSQL database connection handle */
190         SQLstatus               status;         /* PgSQL database connection status */
191         QueueItem               qinprog;        /* If there is currently a query in progress */
192
193         SQLConn(Module* Creator, ConfigTag* tag)
194                 : SQL::Provider(Creator, tag->getString("id"))
195                 , conf(tag)
196                 , sql(NULL)
197                 , status(CWRITE)
198                 , qinprog(NULL, "")
199         {
200                 if (!DoConnect())
201                 {
202                         ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
203                         DelayReconnect();
204                 }
205         }
206
207         CullResult cull() CXX11_OVERRIDE
208         {
209                 this->SQL::Provider::cull();
210                 ServerInstance->Modules->DelService(*this);
211                 return this->EventHandler::cull();
212         }
213
214         ~SQLConn()
215         {
216                 SQL::Error err(SQL::BAD_DBID);
217                 if (qinprog.c)
218                 {
219                         qinprog.c->OnError(err);
220                         delete qinprog.c;
221                 }
222                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
223                 {
224                         SQL::Query* q = i->c;
225                         q->OnError(err);
226                         delete q;
227                 }
228                 Close();
229         }
230
231         void OnEventHandlerRead() CXX11_OVERRIDE
232         {
233                 DoEvent();
234         }
235
236         void OnEventHandlerWrite() CXX11_OVERRIDE
237         {
238                 DoEvent();
239         }
240
241         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
242         {
243                 DelayReconnect();
244         }
245
246         std::string GetDSN()
247         {
248                 std::ostringstream conninfo("connect_timeout = '5'");
249                 std::string item;
250
251                 if (conf->readString("host", item))
252                         conninfo << " host = '" << item << "'";
253
254                 if (conf->readString("port", item))
255                         conninfo << " port = '" << item << "'";
256
257                 if (conf->readString("name", item))
258                         conninfo << " dbname = '" << item << "'";
259
260                 if (conf->readString("user", item))
261                         conninfo << " user = '" << item << "'";
262
263                 if (conf->readString("pass", item))
264                         conninfo << " password = '" << item << "'";
265
266                 if (conf->getBool("ssl"))
267                         conninfo << " sslmode = 'require'";
268                 else
269                         conninfo << " sslmode = 'disable'";
270
271                 return conninfo.str();
272         }
273
274         bool DoConnect()
275         {
276                 sql = PQconnectStart(GetDSN().c_str());
277                 if (!sql)
278                         return false;
279
280                 if(PQstatus(sql) == CONNECTION_BAD)
281                         return false;
282
283                 if(PQsetnonblocking(sql, 1) == -1)
284                         return false;
285
286                 /* OK, we've initialised the connection, now to get it hooked into the socket engine
287                 * and then start polling it.
288                 */
289                 this->fd = PQsocket(sql);
290
291                 if(this->fd <= -1)
292                         return false;
293
294                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
295                 {
296                         ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
297                         return false;
298                 }
299
300                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
301                 return DoPoll();
302         }
303
304         bool DoPoll()
305         {
306                 switch(PQconnectPoll(sql))
307                 {
308                         case PGRES_POLLING_WRITING:
309                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
310                                 status = CWRITE;
311                                 return true;
312                         case PGRES_POLLING_READING:
313                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
314                                 status = CREAD;
315                                 return true;
316                         case PGRES_POLLING_FAILED:
317                                 SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
318                                 status = DEAD;
319                                 return false;
320                         case PGRES_POLLING_OK:
321                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
322                                 status = WWRITE;
323                                 DoConnectedPoll();
324                                 return true;
325                         default:
326                                 return true;
327                 }
328         }
329
330         void DoConnectedPoll()
331         {
332 restart:
333                 while (qinprog.q.empty() && !queue.empty())
334                 {
335                         /* There's no query currently in progress, and there's queries in the queue. */
336                         DoQuery(queue.front());
337                         queue.pop_front();
338                 }
339
340                 if (PQconsumeInput(sql))
341                 {
342                         if (PQisBusy(sql))
343                         {
344                                 /* Nothing happens here */
345                         }
346                         else if (qinprog.c)
347                         {
348                                 /* Fetch the result.. */
349                                 PGresult* result = PQgetResult(sql);
350
351                                 /* PgSQL would allow a query string to be sent which has multiple
352                                  * queries in it, this isn't portable across database backends and
353                                  * we don't want modules doing it. But just in case we make sure we
354                                  * drain any results there are and just use the last one.
355                                  * If the module devs are behaving there will only be one result.
356                                  */
357                                 while (PGresult* temp = PQgetResult(sql))
358                                 {
359                                         PQclear(result);
360                                         result = temp;
361                                 }
362
363                                 /* ..and the result */
364                                 PgSQLresult reply(result);
365                                 switch(PQresultStatus(result))
366                                 {
367                                         case PGRES_EMPTY_QUERY:
368                                         case PGRES_BAD_RESPONSE:
369                                         case PGRES_FATAL_ERROR:
370                                         {
371                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
372                                                 qinprog.c->OnError(err);
373                                                 break;
374                                         }
375                                         default:
376                                                 /* Other values are not errors */
377                                                 qinprog.c->OnResult(reply);
378                                 }
379
380                                 delete qinprog.c;
381                                 qinprog = QueueItem(NULL, "");
382                                 goto restart;
383                         }
384                         else
385                         {
386                                 qinprog.q.clear();
387                         }
388                 }
389                 else
390                 {
391                         /* I think we'll assume this means the server died...it might not,
392                          * but I think that any error serious enough we actually get here
393                          * deserves to reconnect [/excuse]
394                          * Returning true so the core doesn't try and close the connection.
395                          */
396                         DelayReconnect();
397                 }
398         }
399
400         void DelayReconnect();
401
402         void DoEvent()
403         {
404                 if((status == CREAD) || (status == CWRITE))
405                 {
406                         DoPoll();
407                 }
408                 else if (status == WREAD || status == WWRITE)
409                 {
410                         DoConnectedPoll();
411                 }
412         }
413
414         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
415         {
416                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
417                 if (qinprog.q.empty())
418                 {
419                         DoQuery(QueueItem(req,q));
420                 }
421                 else
422                 {
423                         // wait your turn.
424                         queue.push_back(QueueItem(req,q));
425                 }
426         }
427
428         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
429         {
430                 std::string res;
431                 unsigned int param = 0;
432                 for(std::string::size_type i = 0; i < q.length(); i++)
433                 {
434                         if (q[i] != '?')
435                                 res.push_back(q[i]);
436                         else
437                         {
438                                 if (param < p.size())
439                                 {
440                                         std::string parm = p[param++];
441                                         std::vector<char> buffer(parm.length() * 2 + 1);
442                                         int error;
443                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
444                                         if (error)
445                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
446                                         res.append(&buffer[0], escapedsize);
447                                 }
448                         }
449                 }
450                 Submit(req, res);
451         }
452
453         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
454         {
455                 std::string res;
456                 for(std::string::size_type i = 0; i < q.length(); i++)
457                 {
458                         if (q[i] != '$')
459                                 res.push_back(q[i]);
460                         else
461                         {
462                                 std::string field;
463                                 i++;
464                                 while (i < q.length() && isalnum(q[i]))
465                                         field.push_back(q[i++]);
466                                 i--;
467
468                                 SQL::ParamMap::const_iterator it = p.find(field);
469                                 if (it != p.end())
470                                 {
471                                         std::string parm = it->second;
472                                         std::vector<char> buffer(parm.length() * 2 + 1);
473                                         int error;
474                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
475                                         if (error)
476                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
477                                         res.append(&buffer[0], escapedsize);
478                                 }
479                         }
480                 }
481                 Submit(req, res);
482         }
483
484         void DoQuery(const QueueItem& req)
485         {
486                 if (status != WREAD && status != WWRITE)
487                 {
488                         // whoops, not connected...
489                         SQL::Error err(SQL::BAD_CONN);
490                         req.c->OnError(err);
491                         delete req.c;
492                         return;
493                 }
494
495                 if(PQsendQuery(sql, req.q.c_str()))
496                 {
497                         qinprog = req;
498                 }
499                 else
500                 {
501                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
502                         req.c->OnError(err);
503                         delete req.c;
504                 }
505         }
506
507         void Close()
508         {
509                 status = DEAD;
510
511                 if (HasFd() && SocketEngine::HasFd(GetFd()))
512                         SocketEngine::DelFd(this);
513
514                 if(sql)
515                 {
516                         PQfinish(sql);
517                         sql = NULL;
518                 }
519         }
520 };
521
522 class ModulePgSQL : public Module
523 {
524  public:
525         ConnMap connections;
526         ReconnectTimer* retimer;
527
528         ModulePgSQL()
529                 : retimer(NULL)
530         {
531         }
532
533         ~ModulePgSQL()
534         {
535                 delete retimer;
536                 ClearAllConnections();
537         }
538
539         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
540         {
541                 ReadConf();
542         }
543
544         void ReadConf()
545         {
546                 ConnMap conns;
547                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
548                 for(ConfigIter i = tags.first; i != tags.second; i++)
549                 {
550                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
551                                 continue;
552                         std::string id = i->second->getString("id");
553                         ConnMap::iterator curr = connections.find(id);
554                         if (curr == connections.end())
555                         {
556                                 SQLConn* conn = new SQLConn(this, i->second);
557                                 if (conn->status != DEAD)
558                                 {
559                                         conns.insert(std::make_pair(id, conn));
560                                         ServerInstance->Modules->AddService(*conn);
561                                 }
562                                 // If the connection is dead it has already been queued for culling
563                                 // at the end of the main loop so we don't need to delete it here.
564                         }
565                         else
566                         {
567                                 conns.insert(*curr);
568                                 connections.erase(curr);
569                         }
570                 }
571                 ClearAllConnections();
572                 conns.swap(connections);
573         }
574
575         void ClearAllConnections()
576         {
577                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
578                 {
579                         i->second->cull();
580                         delete i->second;
581                 }
582                 connections.clear();
583         }
584
585         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
586         {
587                 SQL::Error err(SQL::BAD_DBID);
588                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
589                 {
590                         SQLConn* conn = i->second;
591                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
592                         {
593                                 conn->qinprog.c->OnError(err);
594                                 delete conn->qinprog.c;
595                                 conn->qinprog.c = NULL;
596                         }
597                         std::deque<QueueItem>::iterator j = conn->queue.begin();
598                         while (j != conn->queue.end())
599                         {
600                                 SQL::Query* q = j->c;
601                                 if (q->creator == mod)
602                                 {
603                                         q->OnError(err);
604                                         delete q;
605                                         j = conn->queue.erase(j);
606                                 }
607                                 else
608                                         j++;
609                         }
610                 }
611         }
612
613         Version GetVersion() CXX11_OVERRIDE
614         {
615                 return Version("Provides the ability for SQL modules to query a PostgreSQL database.", VF_VENDOR);
616         }
617 };
618
619 bool ReconnectTimer::Tick(time_t time)
620 {
621         mod->retimer = NULL;
622         mod->ReadConf();
623         delete this;
624         return false;
625 }
626
627 void SQLConn::DelayReconnect()
628 {
629         status = DEAD;
630         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
631
632         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
633         if (it != mod->connections.end())
634                 mod->connections.erase(it);
635         ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
636         if (!mod->retimer)
637         {
638                 mod->retimer = new ReconnectTimer(mod);
639                 ServerInstance->Timers.AddTimer(mod->retimer);
640         }
641 }
642
643 MODULE_INIT(ModulePgSQL)