2 * InspIRCd -- Internet Relay Chat Daemon
4 * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5 * Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6 * Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7 * Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8 * Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9 * Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
11 * This file is part of InspIRCd. InspIRCd is free software: you can
12 * redistribute it and/or modify it under the terms of the GNU General Public
13 * License as published by the Free Software Foundation, version 2.
15 * This program is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
20 * You should have received a copy of the GNU General Public License
21 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #include "modules/sql.h"
31 /* $ModDesc: PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API */
32 /* $CompileFlags: -Iexec("pg_config --includedir") eval("my $s = `pg_config --version`;$s =~ /^.*?(\d+)\.(\d+)\.(\d+).*?$/;my $v = hex(sprintf("0x%02x%02x%02x", $1, $2, $3));print "-DPGSQL_HAS_ESCAPECONN" if(($v >= 0x080104) || ($v >= 0x07030F && $v < 0x070400) || ($v >= 0x07040D && $v < 0x080000) || ($v >= 0x080008 && $v < 0x080100));") */
33 /* $LinkerFlags: -Lexec("pg_config --libdir") -lpq */
35 /* SQLConn rewritten by peavey to
36 * use EventHandler instead of
37 * BufferedSocket. This is much neater
38 * and gives total control of destroy
39 * and delete of resources.
42 /* Forward declare, so we can have the typedef neatly at the top */
46 typedef std::map<std::string, SQLConn*> ConnMap;
48 /* CREAD, Connecting and wants read event
49 * CWRITE, Connecting and wants write event
50 * WREAD, Connected/Working and wants read event
51 * WWRITE, Connected/Working and wants write event
52 * RREAD, Resetting and wants read event
53 * RWRITE, Resetting and wants write event
55 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
57 class ReconnectTimer : public Timer
62 ReconnectTimer(ModulePgSQL* m) : Timer(5, ServerInstance->Time(), false), mod(m)
65 bool Tick(time_t TIME);
72 QueueItem(SQLQuery* C, const std::string& Q) : c(C), q(Q) {}
75 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
76 * All SQL providers must create their own subclass and define it's methods using that
77 * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
78 * of converting all data to a common format before it reaches the result structure. This way
79 * data is passes to the module nearly as directly as if it was using the API directly itself.
82 class PgSQLresult : public SQLResult
88 PgSQLresult(PGresult* result) : res(result), currentrow(0)
90 rows = PQntuples(res);
92 rows = atoi(PQcmdTuples(res));
105 void GetCols(std::vector<std::string>& result)
107 result.resize(PQnfields(res));
108 for(unsigned int i=0; i < result.size(); i++)
110 result[i] = PQfname(res, i);
114 SQLEntry GetValue(int row, int column)
116 char* v = PQgetvalue(res, row, column);
117 if (!v || PQgetisnull(res, row, column))
120 return SQLEntry(std::string(v, PQgetlength(res, row, column)));
123 bool GetRow(SQLEntries& result)
125 if (currentrow >= PQntuples(res))
127 int ncols = PQnfields(res);
129 for(int i = 0; i < ncols; i++)
131 result.push_back(GetValue(currentrow, i));
139 /** SQLConn represents one SQL session.
141 class SQLConn : public SQLProvider, public EventHandler
144 reference<ConfigTag> conf; /* The <database> entry */
145 std::deque<QueueItem> queue;
146 PGconn* sql; /* PgSQL database connection handle */
147 SQLstatus status; /* PgSQL database connection status */
148 QueueItem qinprog; /* If there is currently a query in progress */
150 SQLConn(Module* Creator, ConfigTag* tag)
151 : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
155 ServerInstance->Logs->Log("m_pgsql", LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
162 this->SQLProvider::cull();
163 ServerInstance->Modules->DelService(*this);
164 return this->EventHandler::cull();
169 SQLerror err(SQL_BAD_DBID);
172 qinprog.c->OnError(err);
175 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
183 void HandleEvent(EventType et, int errornum)
199 std::ostringstream conninfo("connect_timeout = '5'");
202 if (conf->readString("host", item))
203 conninfo << " host = '" << item << "'";
205 if (conf->readString("port", item))
206 conninfo << " port = '" << item << "'";
208 if (conf->readString("name", item))
209 conninfo << " dbname = '" << item << "'";
211 if (conf->readString("user", item))
212 conninfo << " user = '" << item << "'";
214 if (conf->readString("pass", item))
215 conninfo << " password = '" << item << "'";
217 if (conf->getBool("ssl"))
218 conninfo << " sslmode = 'require'";
220 conninfo << " sslmode = 'disable'";
222 return conninfo.str();
227 sql = PQconnectStart(GetDSN().c_str());
231 if(PQstatus(sql) == CONNECTION_BAD)
234 if(PQsetnonblocking(sql, 1) == -1)
237 /* OK, we've initalised the connection, now to get it hooked into the socket engine
238 * and then start polling it.
240 this->fd = PQsocket(sql);
245 if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
247 ServerInstance->Logs->Log("m_pgsql", LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
251 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
257 switch(PQconnectPoll(sql))
259 case PGRES_POLLING_WRITING:
260 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
263 case PGRES_POLLING_READING:
264 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
267 case PGRES_POLLING_FAILED:
269 case PGRES_POLLING_OK:
270 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
278 void DoConnectedPoll()
281 while (qinprog.q.empty() && !queue.empty())
283 /* There's no query currently in progress, and there's queries in the queue. */
284 DoQuery(queue.front());
288 if (PQconsumeInput(sql))
292 /* Nothing happens here */
296 /* Fetch the result.. */
297 PGresult* result = PQgetResult(sql);
299 /* PgSQL would allow a query string to be sent which has multiple
300 * queries in it, this isn't portable across database backends and
301 * we don't want modules doing it. But just in case we make sure we
302 * drain any results there are and just use the last one.
303 * If the module devs are behaving there will only be one result.
305 while (PGresult* temp = PQgetResult(sql))
311 /* ..and the result */
312 PgSQLresult reply(result);
313 switch(PQresultStatus(result))
315 case PGRES_EMPTY_QUERY:
316 case PGRES_BAD_RESPONSE:
317 case PGRES_FATAL_ERROR:
319 SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result));
320 qinprog.c->OnError(err);
324 /* Other values are not errors */
325 qinprog.c->OnResult(reply);
329 qinprog = QueueItem(NULL, "");
339 /* I think we'll assume this means the server died...it might not,
340 * but I think that any error serious enough we actually get here
341 * deserves to reconnect [/excuse]
342 * Returning true so the core doesn't try and close the connection.
350 switch(PQresetPoll(sql))
352 case PGRES_POLLING_WRITING:
353 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
356 case PGRES_POLLING_READING:
357 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
360 case PGRES_POLLING_FAILED:
362 case PGRES_POLLING_OK:
363 ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
371 void DelayReconnect();
375 if((status == CREAD) || (status == CWRITE))
379 else if((status == RREAD) || (status == RWRITE))
389 void submit(SQLQuery *req, const std::string& q)
391 if (qinprog.q.empty())
393 DoQuery(QueueItem(req,q));
398 queue.push_back(QueueItem(req,q));
402 void submit(SQLQuery *req, const std::string& q, const ParamL& p)
405 unsigned int param = 0;
406 for(std::string::size_type i = 0; i < q.length(); i++)
412 if (param < p.size())
414 std::string parm = p[param++];
415 std::vector<char> buffer(parm.length() * 2 + 1);
416 #ifdef PGSQL_HAS_ESCAPECONN
418 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
420 ServerInstance->Logs->Log("m_pgsql", LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
422 size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
424 res.append(&buffer[0], escapedsize);
431 void submit(SQLQuery *req, const std::string& q, const ParamM& p)
434 for(std::string::size_type i = 0; i < q.length(); i++)
442 while (i < q.length() && isalnum(q[i]))
443 field.push_back(q[i++]);
446 ParamM::const_iterator it = p.find(field);
449 std::string parm = it->second;
450 std::vector<char> buffer(parm.length() * 2 + 1);
451 #ifdef PGSQL_HAS_ESCAPECONN
453 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
455 ServerInstance->Logs->Log("m_pgsql", LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
457 size_t escapedsize = PQescapeString(&buffer[0], parm.data(), parm.length());
459 res.append(&buffer[0], escapedsize);
466 void DoQuery(const QueueItem& req)
468 if (status != WREAD && status != WWRITE)
470 // whoops, not connected...
471 SQLerror err(SQL_BAD_CONN);
477 if(PQsendQuery(sql, req.q.c_str()))
483 SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql));
491 ServerInstance->SE->DelFd(this);
501 class ModulePgSQL : public Module
505 ReconnectTimer* retimer;
512 void init() CXX11_OVERRIDE
516 Implementation eventlist[] = { I_OnUnloadModule, I_OnRehash };
517 ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation));
523 ClearAllConnections();
526 void OnRehash(User* user) CXX11_OVERRIDE
534 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
535 for(ConfigIter i = tags.first; i != tags.second; i++)
537 if (i->second->getString("module", "pgsql") != "pgsql")
539 std::string id = i->second->getString("id");
540 ConnMap::iterator curr = connections.find(id);
541 if (curr == connections.end())
543 SQLConn* conn = new SQLConn(this, i->second);
544 conns.insert(std::make_pair(id, conn));
545 ServerInstance->Modules->AddService(*conn);
550 connections.erase(curr);
553 ClearAllConnections();
554 conns.swap(connections);
557 void ClearAllConnections()
559 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
567 void OnUnloadModule(Module* mod) CXX11_OVERRIDE
569 SQLerror err(SQL_BAD_DBID);
570 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
572 SQLConn* conn = i->second;
573 if (conn->qinprog.c && conn->qinprog.c->creator == mod)
575 conn->qinprog.c->OnError(err);
576 delete conn->qinprog.c;
577 conn->qinprog.c = NULL;
579 std::deque<QueueItem>::iterator j = conn->queue.begin();
580 while (j != conn->queue.end())
583 if (q->creator == mod)
587 j = conn->queue.erase(j);
595 Version GetVersion() CXX11_OVERRIDE
597 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
601 bool ReconnectTimer::Tick(time_t time)
608 void SQLConn::DelayReconnect()
610 ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
611 ConnMap::iterator it = mod->connections.find(conf->getString("id"));
612 if (it != mod->connections.end())
614 mod->connections.erase(it);
615 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
618 mod->retimer = new ReconnectTimer(mod);
619 ServerInstance->Timers->AddTimer(mod->retimer);
624 MODULE_INIT(ModulePgSQL)