]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
Remove m_sqlv2.h from these modules, they both use v3 now.
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2010 InspIRCd Development Team
6  * See: http://wiki.inspircd.org/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include <cstdlib>
16 #include <sstream>
17 #include <libpq-fe.h>
18 #include "sql.h"
19
20 /* $ModDesc: PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API */
21 /* $CompileFlags: -Iexec("pg_config --includedir") eval("my $s = `pg_config --version`;$s =~ /^.*?(\d+)\.(\d+)\.(\d+).*?$/;my $v = hex(sprintf("0x%02x%02x%02x", $1, $2, $3));print "-DPGSQL_HAS_ESCAPECONN" if(($v >= 0x080104) || ($v >= 0x07030F && $v < 0x070400) || ($v >= 0x07040D && $v < 0x080000) || ($v >= 0x080008 && $v < 0x080100));") */
22 /* $LinkerFlags: -Lexec("pg_config --libdir") -lpq */
23
24 /* SQLConn rewritten by peavey to
25  * use EventHandler instead of
26  * BufferedSocket. This is much neater
27  * and gives total control of destroy
28  * and delete of resources.
29  */
30
31 /* Forward declare, so we can have the typedef neatly at the top */
32 class SQLConn;
33 class ModulePgSQL;
34
35 typedef std::map<std::string, SQLConn*> ConnMap;
36
37 /* CREAD,       Connecting and wants read event
38  * CWRITE,      Connecting and wants write event
39  * WREAD,       Connected/Working and wants read event
40  * WWRITE,      Connected/Working and wants write event
41  * RREAD,       Resetting and wants read event
42  * RWRITE,      Resetting and wants write event
43  */
44 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
45
46 class ReconnectTimer : public Timer
47 {
48  private:
49         ModulePgSQL* mod;
50  public:
51         ReconnectTimer(ModulePgSQL* m) : Timer(5, ServerInstance->Time(), false), mod(m)
52         {
53         }
54         virtual void Tick(time_t TIME);
55 };
56
57 struct QueueItem
58 {
59         SQLQuery* c;
60         std::string q;
61         QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
62 };
63
64 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
65  * All SQL providers must create their own subclass and define it's methods using that
66  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
67  * of converting all data to a common format before it reaches the result structure. This way
68  * data is passes to the module nearly as directly as if it was using the API directly itself.
69  */
70
71 class PgSQLresult : public SQLResult
72 {
73         PGresult* res;
74         int currentrow;
75         int rows;
76  public:
77         PgSQLresult(PGresult* result) : res(result), currentrow(0)
78         {
79                 rows = PQntuples(res);
80                 if (!rows)
81                         rows = atoi(PQcmdTuples(res));
82         }
83
84         ~PgSQLresult()
85         {
86                 PQclear(res);
87         }
88
89         virtual int Rows()
90         {
91                 return rows;
92         }
93
94         virtual void GetCols(std::vector<std::string>& result)
95         {
96                 result.resize(PQnfields(res));
97                 for(unsigned int i=0; i < result.size(); i++)
98                 {
99                         result[i] = PQfname(res, i);
100                 }
101         }
102
103         virtual SQLEntry GetValue(int row, int column)
104         {
105                 char* v = PQgetvalue(res, row, column);
106                 if (!v || PQgetisnull(res, row, column))
107                         return SQLEntry();
108
109                 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
110         }
111
112         virtual bool GetRow(SQLEntries& result)
113         {
114                 if (currentrow >= PQntuples(res))
115                         return false;
116                 int ncols = PQnfields(res);
117
118                 for(int i = 0; i < ncols; i++)
119                 {
120                         result.push_back(GetValue(currentrow, i));
121                 }
122                 currentrow++;
123
124                 return true;
125         }
126 };
127
128 /** SQLConn represents one SQL session.
129  */
130 class SQLConn : public SQLProvider, public EventHandler
131 {
132  public:
133         reference<ConfigTag> conf;      /* The <database> entry */
134         std::deque<QueueItem> queue;
135         PGconn*                 sql;            /* PgSQL database connection handle */
136         SQLstatus               status;         /* PgSQL database connection status */
137         QueueItem               qinprog;        /* If there is currently a query in progress */
138
139         SQLConn(Module* Creator, ConfigTag* tag)
140         : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
141         {
142                 if (!DoConnect())
143                 {
144                         ServerInstance->Logs->Log("m_pgsql",DEFAULT, "WARNING: Could not connect to database " + tag->getString("id")); 
145                         DelayReconnect();
146                 }
147         }
148
149         CullResult cull()
150         {
151                 this->SQLProvider::cull();
152                 ServerInstance->Modules->DelService(*this);
153                 return this->EventHandler::cull();
154         }
155
156         ~SQLConn()
157         {
158                 SQLerror err(SQL_BAD_DBID);
159                 if (qinprog.c)
160                 {
161                         qinprog.c->OnError(err);
162                         delete qinprog.c;
163                 }
164                 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
165                 {
166                         SQLQuery* q = i->c;
167                         q->OnError(err);
168                         delete q;
169                 }
170         }
171
172         virtual void HandleEvent(EventType et, int errornum)
173         {
174                 switch (et)
175                 {
176                         case EVENT_READ:
177                         case EVENT_WRITE:
178                                 DoEvent();
179                         break;
180
181                         case EVENT_ERROR:
182                                 DelayReconnect();
183                 }
184         }
185
186         std::string GetDSN()
187         {
188                 std::ostringstream conninfo("connect_timeout = '5'");
189                 std::string item;
190
191                 if (conf->readString("host", item))
192                         conninfo << " host = '" << item << "'";
193
194                 if (conf->readString("port", item))
195                         conninfo << " port = '" << item << "'";
196
197                 if (conf->readString("name", item))
198                         conninfo << " dbname = '" << item << "'";
199
200                 if (conf->readString("user", item))
201                         conninfo << " user = '" << item << "'";
202
203                 if (conf->readString("pass", item))
204                         conninfo << " password = '" << item << "'";
205
206                 if (conf->getBool("ssl"))
207                         conninfo << " sslmode = 'require'";
208                 else
209                         conninfo << " sslmode = 'disable'";
210
211                 return conninfo.str();
212         }
213
214         bool DoConnect()
215         {
216                 sql = PQconnectStart(GetDSN().c_str());
217                 if (!sql)
218                         return false;
219
220                 if(PQstatus(sql) == CONNECTION_BAD)
221                         return false;
222
223                 if(PQsetnonblocking(sql, 1) == -1)
224                         return false;
225
226                 /* OK, we've initalised the connection, now to get it hooked into the socket engine
227                 * and then start polling it.
228                 */
229                 this->fd = PQsocket(sql);
230
231                 if(this->fd <= -1)
232                         return false;
233
234                 if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
235                 {
236                         ServerInstance->Logs->Log("m_pgsql",DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
237                         return false;
238                 }
239
240                 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
241                 return DoPoll();
242         }
243
244         bool DoPoll()
245         {
246                 switch(PQconnectPoll(sql))
247                 {
248                         case PGRES_POLLING_WRITING:
249                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
250                                 status = CWRITE;
251                                 return true;
252                         case PGRES_POLLING_READING:
253                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
254                                 status = CREAD;
255                                 return true;
256                         case PGRES_POLLING_FAILED:
257                                 return false;
258                         case PGRES_POLLING_OK:
259                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
260                                 status = WWRITE;
261                                 DoConnectedPoll();
262                         default:
263                                 return true;
264                 }
265         }
266
267         void DoConnectedPoll()
268         {
269 restart:
270                 while (qinprog.q.empty() && !queue.empty())
271                 {
272                         /* There's no query currently in progress, and there's queries in the queue. */
273                         DoQuery(queue.front());
274                         queue.pop_front();
275                 }
276
277                 if (PQconsumeInput(sql))
278                 {
279                         if (PQisBusy(sql))
280                         {
281                                 /* Nothing happens here */
282                         }
283                         else if (qinprog.c)
284                         {
285                                 /* Fetch the result.. */
286                                 PGresult* result = PQgetResult(sql);
287
288                                 /* PgSQL would allow a query string to be sent which has multiple
289                                  * queries in it, this isn't portable across database backends and
290                                  * we don't want modules doing it. But just in case we make sure we
291                                  * drain any results there are and just use the last one.
292                                  * If the module devs are behaving there will only be one result.
293                                  */
294                                 while (PGresult* temp = PQgetResult(sql))
295                                 {
296                                         PQclear(result);
297                                         result = temp;
298                                 }
299
300                                 /* ..and the result */
301                                 PgSQLresult reply(result);
302                                 switch(PQresultStatus(result))
303                                 {
304                                         case PGRES_EMPTY_QUERY:
305                                         case PGRES_BAD_RESPONSE:
306                                         case PGRES_FATAL_ERROR:
307                                         {
308                                                 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
309                                                 qinprog.c->OnError(err);
310                                                 break;
311                                         }
312                                         default:
313                                                 /* Other values are not errors */
314                                                 qinprog.c->OnResult(reply);
315                                 }
316
317                                 delete qinprog.c;
318                                 qinprog = QueueItem(NULL, "");
319                                 goto restart;
320                         }
321                         else
322                         {
323                                 qinprog.q = "";
324                         }
325                 }
326                 else
327                 {
328                         /* I think we'll assume this means the server died...it might not,
329                          * but I think that any error serious enough we actually get here
330                          * deserves to reconnect [/excuse]
331                          * Returning true so the core doesn't try and close the connection.
332                          */
333                         DelayReconnect();
334                 }
335         }
336
337         bool DoResetPoll()
338         {
339                 switch(PQresetPoll(sql))
340                 {
341                         case PGRES_POLLING_WRITING:
342                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
343                                 status = CWRITE;
344                                 return DoPoll();
345                         case PGRES_POLLING_READING:
346                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
347                                 status = CREAD;
348                                 return true;
349                         case PGRES_POLLING_FAILED:
350                                 return false;
351                         case PGRES_POLLING_OK:
352                                 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
353                                 status = WWRITE;
354                                 DoConnectedPoll();
355                         default:
356                                 return true;
357                 }
358         }
359
360         void DelayReconnect();
361
362         void DoEvent()
363         {
364                 if((status == CREAD) || (status == CWRITE))
365                 {
366                         DoPoll();
367                 }
368                 else if((status == RREAD) || (status == RWRITE))
369                 {
370                         DoResetPoll();
371                 }
372                 else
373                 {
374                         DoConnectedPoll();
375                 }
376         }
377
378         void submit(SQLQuery *req, const std::string& q)
379         {
380                 if (qinprog.q.empty())
381                 {
382                         DoQuery(QueueItem(req,q));
383                 }
384                 else
385                 {
386                         // wait your turn.
387                         queue.push_back(QueueItem(req,q));
388                 }
389         }
390
391         void submit(SQLQuery *req, const std::string& q, const ParamL& p)
392         {
393                 std::string res;
394                 unsigned int param = 0;
395                 for(std::string::size_type i = 0; i < q.length(); i++)
396                 {
397                         if (q[i] != '?')
398                                 res.push_back(q[i]);
399                         else
400                         {
401                                 if (param < p.size())
402                                 {
403                                         std::string parm = p[param++];
404                                         char buffer[MAXBUF];
405 #ifdef PGSQL_HAS_ESCAPECONN
406                                         int error;
407                                         PQescapeStringConn(sql, buffer, parm.c_str(), parm.length(), &error);
408                                         if (error)
409                                                 ServerInstance->Logs->Log("m_pgsql", DEBUG, "BUG: Apparently PQescapeStringConn() failed");
410 #else
411                                         PQescapeString         (buffer, parm.c_str(), parm.length());
412 #endif
413                                         res.append(buffer);
414                                 }
415                         }
416                 }
417                 submit(req, res);
418         }
419
420         void submit(SQLQuery *req, const std::string& q, const ParamM& p)
421         {
422                 std::string res;
423                 for(std::string::size_type i = 0; i < q.length(); i++)
424                 {
425                         if (q[i] != '$')
426                                 res.push_back(q[i]);
427                         else
428                         {
429                                 std::string field;
430                                 i++;
431                                 while (i < q.length() && isalnum(q[i]))
432                                         field.push_back(q[i++]);
433                                 i--;
434
435                                 ParamM::const_iterator it = p.find(field);
436                                 if (it != p.end())
437                                 {
438                                         std::string parm = it->second;
439                                         char buffer[MAXBUF];
440 #ifdef PGSQL_HAS_ESCAPECONN
441                                         int error;
442                                         PQescapeStringConn(sql, buffer, parm.c_str(), parm.length(), &error);
443                                         if (error)
444                                                 ServerInstance->Logs->Log("m_pgsql", DEBUG, "BUG: Apparently PQescapeStringConn() failed");
445 #else
446                                         PQescapeString         (buffer, parm.c_str(), parm.length());
447 #endif
448                                         res.append(buffer);
449                                 }
450                         }
451                 }
452                 submit(req, res);
453         }
454
455         void DoQuery(const QueueItem& req)
456         {
457                 if (status != WREAD && status != WWRITE)
458                 {
459                         // whoops, not connected...
460                         SQLerror err(SQL_BAD_CONN);
461                         req.c->OnError(err);
462                         delete req.c;
463                         return;
464                 }
465
466                 if(PQsendQuery(sql, req.q.c_str()))
467                 {
468                         qinprog = req;
469                 }
470                 else
471                 {
472                         SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
473                         req.c->OnError(err);
474                         delete req.c;
475                 }
476         }
477
478         void Close()
479         {
480                 ServerInstance->SE->DelFd(this);
481
482                 if(sql)
483                 {
484                         PQfinish(sql);
485                         sql = NULL;
486                 }
487         }
488 };
489
490 class ModulePgSQL : public Module
491 {
492  public:
493         ConnMap connections;
494         ReconnectTimer* retimer;
495
496         ModulePgSQL()
497         {
498         }
499
500         void init()
501         {
502                 ReadConf();
503
504                 Implementation eventlist[] = { I_OnUnloadModule, I_OnRehash };
505                 ServerInstance->Modules->Attach(eventlist, this, 2);
506         }
507
508         virtual ~ModulePgSQL()
509         {
510                 if (retimer)
511                         ServerInstance->Timers->DelTimer(retimer);
512                 ClearAllConnections();
513         }
514
515         virtual void OnRehash(User* user)
516         {
517                 ReadConf();
518         }
519
520         void ReadConf()
521         {
522                 ConnMap conns;
523                 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
524                 for(ConfigIter i = tags.first; i != tags.second; i++)
525                 {
526                         if (i->second->getString("module", "pgsql") != "pgsql")
527                                 continue;
528                         std::string id = i->second->getString("id");
529                         ConnMap::iterator curr = connections.find(id);
530                         if (curr == connections.end())
531                         {
532                                 SQLConn* conn = new SQLConn(this, i->second);
533                                 conns.insert(std::make_pair(id, conn));
534                                 ServerInstance->Modules->AddService(*conn);
535                         }
536                         else
537                         {
538                                 conns.insert(*curr);
539                                 connections.erase(curr);
540                         }
541                 }
542                 ClearAllConnections();
543                 conns.swap(connections);
544         }
545
546         void ClearAllConnections()
547         {
548                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
549                 {
550                         i->second->cull();
551                         delete i->second;
552                 }
553                 connections.clear();
554         }
555
556         void OnUnloadModule(Module* mod)
557         {
558                 SQLerror err(SQL_BAD_DBID);
559                 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
560                 {
561                         SQLConn* conn = i->second;
562                         if (conn->qinprog.c && conn->qinprog.c->creator == mod)
563                         {
564                                 conn->qinprog.c->OnError(err);
565                                 delete conn->qinprog.c;
566                                 conn->qinprog.c = NULL;
567                         }
568                         std::deque<QueueItem>::iterator j = conn->queue.begin();
569                         while (j != conn->queue.end())
570                         {
571                                 SQLQuery* q = j->c;
572                                 if (q->creator == mod)
573                                 {
574                                         q->OnError(err);
575                                         delete q;
576                                         j = conn->queue.erase(j);
577                                 }
578                                 else
579                                         j++;
580                         }
581                 }
582         }
583
584         Version GetVersion()
585         {
586                 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
587         }
588 };
589
590 void ReconnectTimer::Tick(time_t time)
591 {
592         mod->retimer = NULL;
593         mod->ReadConf();
594 }
595
596 void SQLConn::DelayReconnect()
597 {
598         ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
599         ConnMap::iterator it = mod->connections.find(conf->getString("id"));
600         if (it != mod->connections.end())
601         {
602                 mod->connections.erase(it);
603                 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
604                 if (!mod->retimer)
605                 {
606                         mod->retimer = new ReconnectTimer(mod);
607                         ServerInstance->Timers->AddTimer(mod->retimer);
608                 }
609         }
610 }
611
612 MODULE_INIT(ModulePgSQL)