]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
36156013df7ad703f189bb23511577c97dc8c11e
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2016 Adam <Adam@anope.org>
5  *   Copyright (C) 2015 Daniel Vassdal <shutter@canternet.org>
6  *   Copyright (C) 2013, 2016-2020 Sadie Powell <sadie@witchery.services>
7  *   Copyright (C) 2012-2015 Attila Molnar <attilamolnar@hush.com>
8  *   Copyright (C) 2012 Robby <robby@chatbelgie.be>
9  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
10  *   Copyright (C) 2009 Uli Schlachter <psychon@inspircd.org>
11  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
12  *   Copyright (C) 2007, 2009-2010 Craig Edwards <brain@inspircd.org>
13  *   Copyright (C) 2007, 2009 Dennis Friis <peavey@inspircd.org>
14  *   Copyright (C) 2007 Robin Burchell <robin+git@viroteck.net>
15  *   Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
16  *
17  * This file is part of InspIRCd.  InspIRCd is free software: you can
18  * redistribute it and/or modify it under the terms of the GNU General Public
19  * License as published by the Free Software Foundation, version 2.
20  *
21  * This program is distributed in the hope that it will be useful, but WITHOUT
22  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
24  * details.
25  *
26  * You should have received a copy of the GNU General Public License
27  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
28  */
29
30 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
31 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
32
33 /// $PackageInfo: require_system("arch") postgresql-libs
34 /// $PackageInfo: require_system("centos") postgresql-devel
35 /// $PackageInfo: require_system("darwin") postgresql
36 /// $PackageInfo: require_system("debian") libpq-dev
37 /// $PackageInfo: require_system("ubuntu") libpq-dev
38
39
40 #include "inspircd.h"
41 #include <cstdlib>
42 #include <libpq-fe.h>
43 #include "modules/sql.h"
44
45 /* SQLConn rewritten by peavey to
46  * use EventHandler instead of
47  * BufferedSocket. This is much neater
48  * and gives total control of destroy
49  * and delete of resources.
50  */
51
52 /* Forward declare, so we can have the typedef neatly at the top */
53 class SQLConn;
54 class ModulePgSQL;
55
56 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
57
58 enum SQLstatus
59 {
60         // The connection has died.
61         DEAD,
62
63         // Connecting and wants read event.
64         CREAD,
65
66         // Connecting and wants write event.
67         CWRITE,
68
69         // Connected/working and wants read event.
70         WREAD,
71
72         // Connected/working and wants write event.
73         WWRITE
74 };
75
76 class ReconnectTimer : public Timer
77 {
78  private:
79         ModulePgSQL* mod;
80  public:
81         ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
82         {
83         }
84         bool Tick(time_t TIME) CXX11_OVERRIDE;
85 };
86
87 struct QueueItem
88 {
89         SQL::Query* c;
90         std::string q;
91         QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
92 };
93
94 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
95  * All SQL providers must create their own subclass and define it's methods using that
96  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
97  * of converting all data to a common format before it reaches the result structure. This way
98  * data is passes to the module nearly as directly as if it was using the API directly itself.
99  */
100
101 class PgSQLresult : public SQL::Result
102 {
103         PGresult* res;
104         int currentrow;
105         int rows;
106         std::vector<std::string> colnames;
107
108         void getColNames()
109         {
110                 colnames.resize(PQnfields(res));
111                 for(unsigned int i=0; i < colnames.size(); i++)
112                 {
113                         colnames[i] = PQfname(res, i);
114                 }
115         }
116  public:
117         PgSQLresult(PGresult* result) : res(result), currentrow(0)
118         {
119                 rows = PQntuples(res);
120                 if (!rows)
121                         rows = ConvToNum<int>(PQcmdTuples(res));
122         }
123
124         ~PgSQLresult()
125         {
126                 PQclear(res);
127         }
128
129         int Rows() CXX11_OVERRIDE
130         {
131                 return rows;
132         }
133
134         void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
135         {
136                 if (colnames.empty())
137                         getColNames();
138                 result = colnames;
139         }
140
141         bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
142         {
143                 if (colnames.empty())
144                         getColNames();
145
146                 for (size_t i = 0; i < colnames.size(); ++i)
147                 {
148                         if (colnames[i] == column)
149                         {
150                                 index = i;
151                                 return true;
152                         }
153                 }
154                 return false;
155         }
156
157         SQL::Field GetValue(int row, int column)
158         {
159                 char* v = PQgetvalue(res, row, column);
160                 if (!v || PQgetisnull(res, row, column))
161                         return SQL::Field();
162
163                 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
164         }
165
166         bool GetRow(SQL::Row& result) CXX11_OVERRIDE
167         {
168                 if (currentrow >= PQntuples(res))
169                         return false;
170                 int ncols = PQnfields(res);
171
172                 for(int i = 0; i < ncols; i++)
173                 {
174                         result.push_back(GetValue(currentrow, i));
175                 }
176                 currentrow++;
177
178                 return true;
179         }
180 };
181
182 /** SQLConn represents one SQL session.
183  */
184 class SQLConn : public SQL::Provider, public EventHandler
185 {
186  public:
187         reference<ConfigTag> conf;      /* The <database> entry */
188         std::deque<QueueItem> queue;
189         PGconn*                 sql;            /* PgSQL database connection handle */
190         SQLstatus               status;         /* PgSQL database connection status */
191         QueueItem               qinprog;        /* If there is currently a query in progress */
192
193         SQLConn(Module* Creator, ConfigTag* tag)
194                 : SQL::Provider(Creator, tag->getString("id"))
195                 , conf(tag)
196                 , sql(NULL)
197                 , status(CWRITE)
198                 , qinprog(NULL, "")
199         {
200                 if (!DoConnect())
201                         DelayReconnect();
202         }
203
204         CullResult cull() CXX11_OVERRIDE
205         {
206                 this->SQL::Provider::cull();
207                 ServerInstance->Modules->DelService(*this);
208                 return this->EventHandler::cull();
209         }
210
211         ~SQLConn()
212         {
213                 SQL::Error err(SQL::BAD_DBID);
214                 if (qinprog.c)
215                 {
216                         qinprog.c->OnError(err);
217                         delete qinprog.c;
218                 }
219                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
220                 {
221                         SQL::Query* q = i->c;
222                         q->OnError(err);
223                         delete q;
224                 }
225                 Close();
226         }
227
228         void OnEventHandlerRead() CXX11_OVERRIDE
229         {
230                 DoEvent();
231         }
232
233         void OnEventHandlerWrite() CXX11_OVERRIDE
234         {
235                 DoEvent();
236         }
237
238         void OnEventHandlerError(int errornum) CXX11_OVERRIDE
239         {
240                 DelayReconnect();
241         }
242
243         std::string GetDSN()
244         {
245                 std::ostringstream conninfo("connect_timeout = '5'");
246                 std::string item;
247
248                 if (conf->readString("host", item))
249                         conninfo << " host = '" << item << "'";
250
251                 if (conf->readString("port", item))
252                         conninfo << " port = '" << item << "'";
253
254                 if (conf->readString("name", item))
255                         conninfo << " dbname = '" << item << "'";
256
257                 if (conf->readString("user", item))
258                         conninfo << " user = '" << item << "'";
259
260                 if (conf->readString("pass", item))
261                         conninfo << " password = '" << item << "'";
262
263                 if (conf->getBool("ssl"))
264                         conninfo << " sslmode = 'require'";
265                 else
266                         conninfo << " sslmode = 'disable'";
267
268                 return conninfo.str();
269         }
270
271         bool HandleConnectError(const char* reason)
272         {
273                 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Could not connect to the \"%s\" database: %s",
274                         GetId().c_str(), reason);
275                 return false;
276         }
277
278         bool DoConnect()
279         {
280                 sql = PQconnectStart(GetDSN().c_str());
281                 if (!sql)
282                         return HandleConnectError("PQconnectStart returned NULL");
283
284                 if(PQstatus(sql) == CONNECTION_BAD)
285                         return HandleConnectError("connection status is bad");
286
287                 if(PQsetnonblocking(sql, 1) == -1)
288                         return HandleConnectError("unable to mark fd as non-blocking");
289
290                 /* OK, we've initialised the connection, now to get it hooked into the socket engine
291                 * and then start polling it.
292                 */
293                 SetFd(PQsocket(sql));
294                 if(!HasFd())
295                         return HandleConnectError("PQsocket returned an invalid fd");
296
297                 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
298                         return HandleConnectError("could not add the pgsql socket to the socket engine");
299
300                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
301                 if (!DoPoll())
302                         return HandleConnectError("could not poll the connection state");
303
304                 return true;
305         }
306
307         bool DoPoll()
308         {
309                 switch(PQconnectPoll(sql))
310                 {
311                         case PGRES_POLLING_WRITING:
312                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
313                                 status = CWRITE;
314                                 return true;
315                         case PGRES_POLLING_READING:
316                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
317                                 status = CREAD;
318                                 return true;
319                         case PGRES_POLLING_FAILED:
320                                 SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
321                                 status = DEAD;
322                                 return false;
323                         case PGRES_POLLING_OK:
324                                 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
325                                 status = WWRITE;
326                                 DoConnectedPoll();
327                                 return true;
328                         default:
329                                 return true;
330                 }
331         }
332
333         void DoConnectedPoll()
334         {
335 restart:
336                 while (qinprog.q.empty() && !queue.empty())
337                 {
338                         /* There's no query currently in progress, and there's queries in the queue. */
339                         DoQuery(queue.front());
340                         queue.pop_front();
341                 }
342
343                 if (PQconsumeInput(sql))
344                 {
345                         if (PQisBusy(sql))
346                         {
347                                 /* Nothing happens here */
348                         }
349                         else if (qinprog.c)
350                         {
351                                 /* Fetch the result.. */
352                                 PGresult* result = PQgetResult(sql);
353
354                                 /* PgSQL would allow a query string to be sent which has multiple
355                                  * queries in it, this isn't portable across database backends and
356                                  * we don't want modules doing it. But just in case we make sure we
357                                  * drain any results there are and just use the last one.
358                                  * If the module devs are behaving there will only be one result.
359                                  */
360                                 while (PGresult* temp = PQgetResult(sql))
361                                 {
362                                         PQclear(result);
363                                         result = temp;
364                                 }
365
366                                 /* ..and the result */
367                                 PgSQLresult reply(result);
368                                 switch(PQresultStatus(result))
369                                 {
370                                         case PGRES_EMPTY_QUERY:
371                                         case PGRES_BAD_RESPONSE:
372                                         case PGRES_FATAL_ERROR:
373                                         {
374                                                 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
375                                                 qinprog.c->OnError(err);
376                                                 break;
377                                         }
378                                         default:
379                                                 /* Other values are not errors */
380                                                 qinprog.c->OnResult(reply);
381                                 }
382
383                                 delete qinprog.c;
384                                 qinprog = QueueItem(NULL, "");
385                                 goto restart;
386                         }
387                         else
388                         {
389                                 qinprog.q.clear();
390                         }
391                 }
392                 else
393                 {
394                         /* I think we'll assume this means the server died...it might not,
395                          * but I think that any error serious enough we actually get here
396                          * deserves to reconnect [/excuse]
397                          * Returning true so the core doesn't try and close the connection.
398                          */
399                         DelayReconnect();
400                 }
401         }
402
403         void DelayReconnect();
404
405         void DoEvent()
406         {
407                 if((status == CREAD) || (status == CWRITE))
408                 {
409                         DoPoll();
410                 }
411                 else if (status == WREAD || status == WWRITE)
412                 {
413                         DoConnectedPoll();
414                 }
415         }
416
417         void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
418         {
419                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
420                 if (qinprog.q.empty())
421                 {
422                         DoQuery(QueueItem(req,q));
423                 }
424                 else
425                 {
426                         // wait your turn.
427                         queue.push_back(QueueItem(req,q));
428                 }
429         }
430
431         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
432         {
433                 std::string res;
434                 unsigned int param = 0;
435                 for(std::string::size_type i = 0; i < q.length(); i++)
436                 {
437                         if (q[i] != '?')
438                                 res.push_back(q[i]);
439                         else
440                         {
441                                 if (param < p.size())
442                                 {
443                                         std::string parm = p[param++];
444                                         std::vector<char> buffer(parm.length() * 2 + 1);
445                                         int error;
446                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
447                                         if (error)
448                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
449                                         res.append(&buffer[0], escapedsize);
450                                 }
451                         }
452                 }
453                 Submit(req, res);
454         }
455
456         void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
457         {
458                 std::string res;
459                 for(std::string::size_type i = 0; i < q.length(); i++)
460                 {
461                         if (q[i] != '$')
462                                 res.push_back(q[i]);
463                         else
464                         {
465                                 std::string field;
466                                 i++;
467                                 while (i < q.length() && isalnum(q[i]))
468                                         field.push_back(q[i++]);
469                                 i--;
470
471                                 SQL::ParamMap::const_iterator it = p.find(field);
472                                 if (it != p.end())
473                                 {
474                                         std::string parm = it->second;
475                                         std::vector<char> buffer(parm.length() * 2 + 1);
476                                         int error;
477                                         size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
478                                         if (error)
479                                                 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
480                                         res.append(&buffer[0], escapedsize);
481                                 }
482                         }
483                 }
484                 Submit(req, res);
485         }
486
487         void DoQuery(const QueueItem& req)
488         {
489                 if (status != WREAD && status != WWRITE)
490                 {
491                         // whoops, not connected...
492                         SQL::Error err(SQL::BAD_CONN);
493                         req.c->OnError(err);
494                         delete req.c;
495                         return;
496                 }
497
498                 if(PQsendQuery(sql, req.q.c_str()))
499                 {
500                         qinprog = req;
501                 }
502                 else
503                 {
504                         SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
505                         req.c->OnError(err);
506                         delete req.c;
507                 }
508         }
509
510         void Close()
511         {
512                 status = DEAD;
513
514                 if (HasFd() && SocketEngine::HasFd(GetFd()))
515                         SocketEngine::DelFd(this);
516
517                 if(sql)
518                 {
519                         PQfinish(sql);
520                         sql = NULL;
521                 }
522         }
523 };
524
525 class ModulePgSQL : public Module
526 {
527  public:
528         ConnMap connections;
529         ReconnectTimer* retimer;
530
531         ModulePgSQL()
532                 : retimer(NULL)
533         {
534         }
535
536         ~ModulePgSQL()
537         {
538                 delete retimer;
539                 ClearAllConnections();
540         }
541
542         void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
543         {
544                 ReadConf();
545         }
546
547         void ReadConf()
548         {
549                 ConnMap conns;
550                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
551                 for(ConfigIter i = tags.first; i != tags.second; i++)
552                 {
553                         if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
554                                 continue;
555                         std::string id = i->second->getString("id");
556                         ConnMap::iterator curr = connections.find(id);
557                         if (curr == connections.end())
558                         {
559                                 SQLConn* conn = new SQLConn(this, i->second);
560                                 if (conn->status != DEAD)
561                                 {
562                                         conns.insert(std::make_pair(id, conn));
563                                         ServerInstance->Modules->AddService(*conn);
564                                 }
565                                 // If the connection is dead it has already been queued for culling
566                                 // at the end of the main loop so we don't need to delete it here.
567                         }
568                         else
569                         {
570                                 conns.insert(*curr);
571                                 connections.erase(curr);
572                         }
573                 }
574                 ClearAllConnections();
575                 conns.swap(connections);
576         }
577
578         void ClearAllConnections()
579         {
580                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
581                 {
582                         i->second->cull();
583                         delete i->second;
584                 }
585                 connections.clear();
586         }
587
588         void OnUnloadModule(Module* mod) CXX11_OVERRIDE
589         {
590                 SQL::Error err(SQL::BAD_DBID);
591                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
592                 {
593                         SQLConn* conn = i->second;
594                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
595                         {
596                                 conn->qinprog.c->OnError(err);
597                                 delete conn->qinprog.c;
598                                 conn->qinprog.c = NULL;
599                         }
600                         std::deque<QueueItem>::iterator j = conn->queue.begin();
601                         while (j != conn->queue.end())
602                         {
603                                 SQL::Query* q = j->c;
604                                 if (q->creator == mod)
605                                 {
606                                         q->OnError(err);
607                                         delete q;
608                                         j = conn->queue.erase(j);
609                                 }
610                                 else
611                                         j++;
612                         }
613                 }
614         }
615
616         Version GetVersion() CXX11_OVERRIDE
617         {
618                 return Version("Provides the ability for SQL modules to query a PostgreSQL database.", VF_VENDOR);
619         }
620 };
621
622 bool ReconnectTimer::Tick(time_t time)
623 {
624         mod->retimer = NULL;
625         mod->ReadConf();
626         delete this;
627         return false;
628 }
629
630 void SQLConn::DelayReconnect()
631 {
632         status = DEAD;
633         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
634
635         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
636         if (it != mod->connections.end())
637                 mod->connections.erase(it);
638         ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
639         if (!mod->retimer)
640         {
641                 mod->retimer = new ReconnectTimer(mod);
642                 ServerInstance->Timers.AddTimer(mod->retimer);
643         }
644 }
645
646 MODULE_INIT(ModulePgSQL)