2 * InspIRCd -- Internet Relay Chat Daemon
4 * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
5 * Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
6 * Copyright (C) 2006-2007, 2009 Craig Edwards <craigedwards@brainbox.cc>
7 * Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
8 * Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
9 * Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
11 * This file is part of InspIRCd. InspIRCd is free software: you can
12 * redistribute it and/or modify it under the terms of the GNU General Public
13 * License as published by the Free Software Foundation, version 2.
15 * This program is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
20 * You should have received a copy of the GNU General Public License
21 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
25 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
27 /// $PackageInfo: require_system("centos") postgresql-devel
28 /// $PackageInfo: require_system("darwin") postgresql
29 /// $PackageInfo: require_system("debian") libpq-dev
30 /// $PackageInfo: require_system("ubuntu") libpq-dev
36 #include "modules/sql.h"
38 /* SQLConn rewritten by peavey to
39 * use EventHandler instead of
40 * BufferedSocket. This is much neater
41 * and gives total control of destroy
42 * and delete of resources.
45 /* Forward declare, so we can have the typedef neatly at the top */
49 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
51 /* CREAD, Connecting and wants read event
52 * CWRITE, Connecting and wants write event
53 * WREAD, Connected/Working and wants read event
54 * WWRITE, Connected/Working and wants write event
55 * RREAD, Resetting and wants read event
56 * RWRITE, Resetting and wants write event
58 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE };
60 class ReconnectTimer : public Timer
65 ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
68 bool Tick(time_t TIME) CXX11_OVERRIDE;
75 QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
78 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
79 * All SQL providers must create their own subclass and define it's methods using that
80 * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
81 * of converting all data to a common format before it reaches the result structure. This way
82 * data is passes to the module nearly as directly as if it was using the API directly itself.
85 class PgSQLresult : public SQL::Result
90 std::vector<std::string> colnames;
94 colnames.resize(PQnfields(res));
95 for(unsigned int i=0; i < colnames.size(); i++)
97 colnames[i] = PQfname(res, i);
101 PgSQLresult(PGresult* result) : res(result), currentrow(0)
103 rows = PQntuples(res);
105 rows = atoi(PQcmdTuples(res));
113 int Rows() CXX11_OVERRIDE
118 void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
120 if (colnames.empty())
125 bool HasColumn(const std::string& column, size_t& index)
127 if (colnames.empty())
130 for (size_t i = 0; i < colnames.size(); ++i)
132 if (colnames[i] == column)
141 SQL::Field GetValue(int row, int column)
143 char* v = PQgetvalue(res, row, column);
144 if (!v || PQgetisnull(res, row, column))
147 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
150 bool GetRow(SQL::Row& result) CXX11_OVERRIDE
152 if (currentrow >= PQntuples(res))
154 int ncols = PQnfields(res);
156 for(int i = 0; i < ncols; i++)
158 result.push_back(GetValue(currentrow, i));
166 /** SQLConn represents one SQL session.
168 class SQLConn : public SQL::Provider, public EventHandler
171 reference<ConfigTag> conf; /* The <database> entry */
172 std::deque<QueueItem> queue;
173 PGconn* sql; /* PgSQL database connection handle */
174 SQLstatus status; /* PgSQL database connection status */
175 QueueItem qinprog; /* If there is currently a query in progress */
177 SQLConn(Module* Creator, ConfigTag* tag)
178 : SQL::Provider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL, "")
182 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "WARNING: Could not connect to database " + tag->getString("id"));
187 CullResult cull() CXX11_OVERRIDE
189 this->SQL::Provider::cull();
190 ServerInstance->Modules->DelService(*this);
191 return this->EventHandler::cull();
196 SQL::Error err(SQL::BAD_DBID);
199 qinprog.c->OnError(err);
202 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
204 SQL::Query* q = i->c;
210 void OnEventHandlerRead() CXX11_OVERRIDE
215 void OnEventHandlerWrite() CXX11_OVERRIDE
220 void OnEventHandlerError(int errornum) CXX11_OVERRIDE
227 std::ostringstream conninfo("connect_timeout = '5'");
230 if (conf->readString("host", item))
231 conninfo << " host = '" << item << "'";
233 if (conf->readString("port", item))
234 conninfo << " port = '" << item << "'";
236 if (conf->readString("name", item))
237 conninfo << " dbname = '" << item << "'";
239 if (conf->readString("user", item))
240 conninfo << " user = '" << item << "'";
242 if (conf->readString("pass", item))
243 conninfo << " password = '" << item << "'";
245 if (conf->getBool("ssl"))
246 conninfo << " sslmode = 'require'";
248 conninfo << " sslmode = 'disable'";
250 return conninfo.str();
255 sql = PQconnectStart(GetDSN().c_str());
259 if(PQstatus(sql) == CONNECTION_BAD)
262 if(PQsetnonblocking(sql, 1) == -1)
265 /* OK, we've initalised the connection, now to get it hooked into the socket engine
266 * and then start polling it.
268 this->fd = PQsocket(sql);
273 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
275 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Couldn't add pgsql socket to socket engine");
279 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
285 switch(PQconnectPoll(sql))
287 case PGRES_POLLING_WRITING:
288 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
291 case PGRES_POLLING_READING:
292 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
295 case PGRES_POLLING_FAILED:
297 case PGRES_POLLING_OK:
298 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
306 void DoConnectedPoll()
309 while (qinprog.q.empty() && !queue.empty())
311 /* There's no query currently in progress, and there's queries in the queue. */
312 DoQuery(queue.front());
316 if (PQconsumeInput(sql))
320 /* Nothing happens here */
324 /* Fetch the result.. */
325 PGresult* result = PQgetResult(sql);
327 /* PgSQL would allow a query string to be sent which has multiple
328 * queries in it, this isn't portable across database backends and
329 * we don't want modules doing it. But just in case we make sure we
330 * drain any results there are and just use the last one.
331 * If the module devs are behaving there will only be one result.
333 while (PGresult* temp = PQgetResult(sql))
339 /* ..and the result */
340 PgSQLresult reply(result);
341 switch(PQresultStatus(result))
343 case PGRES_EMPTY_QUERY:
344 case PGRES_BAD_RESPONSE:
345 case PGRES_FATAL_ERROR:
347 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
348 qinprog.c->OnError(err);
352 /* Other values are not errors */
353 qinprog.c->OnResult(reply);
357 qinprog = QueueItem(NULL, "");
367 /* I think we'll assume this means the server died...it might not,
368 * but I think that any error serious enough we actually get here
369 * deserves to reconnect [/excuse]
370 * Returning true so the core doesn't try and close the connection.
378 switch(PQresetPoll(sql))
380 case PGRES_POLLING_WRITING:
381 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
384 case PGRES_POLLING_READING:
385 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
388 case PGRES_POLLING_FAILED:
390 case PGRES_POLLING_OK:
391 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
399 void DelayReconnect();
403 if((status == CREAD) || (status == CWRITE))
407 else if((status == RREAD) || (status == RWRITE))
417 void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
419 if (qinprog.q.empty())
421 DoQuery(QueueItem(req,q));
426 queue.push_back(QueueItem(req,q));
430 void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
433 unsigned int param = 0;
434 for(std::string::size_type i = 0; i < q.length(); i++)
440 if (param < p.size())
442 std::string parm = p[param++];
443 std::vector<char> buffer(parm.length() * 2 + 1);
445 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
447 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
448 res.append(&buffer[0], escapedsize);
455 void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
458 for(std::string::size_type i = 0; i < q.length(); i++)
466 while (i < q.length() && isalnum(q[i]))
467 field.push_back(q[i++]);
470 SQL::ParamMap::const_iterator it = p.find(field);
473 std::string parm = it->second;
474 std::vector<char> buffer(parm.length() * 2 + 1);
476 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
478 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
479 res.append(&buffer[0], escapedsize);
486 void DoQuery(const QueueItem& req)
488 if (status != WREAD && status != WWRITE)
490 // whoops, not connected...
491 SQL::Error err(SQL::BAD_CONN);
497 if(PQsendQuery(sql, req.q.c_str()))
503 SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
511 SocketEngine::DelFd(this);
521 class ModulePgSQL : public Module
525 ReconnectTimer* retimer;
535 ClearAllConnections();
538 void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
546 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
547 for(ConfigIter i = tags.first; i != tags.second; i++)
549 if (!stdalgo::string::equalsci(i->second->getString("provider"), "pgsql"))
551 std::string id = i->second->getString("id");
552 ConnMap::iterator curr = connections.find(id);
553 if (curr == connections.end())
555 SQLConn* conn = new SQLConn(this, i->second);
556 conns.insert(std::make_pair(id, conn));
557 ServerInstance->Modules->AddService(*conn);
562 connections.erase(curr);
565 ClearAllConnections();
566 conns.swap(connections);
569 void ClearAllConnections()
571 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
579 void OnUnloadModule(Module* mod) CXX11_OVERRIDE
581 SQL::Error err(SQL::BAD_DBID);
582 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
584 SQLConn* conn = i->second;
585 if (conn->qinprog.c && conn->qinprog.c->creator == mod)
587 conn->qinprog.c->OnError(err);
588 delete conn->qinprog.c;
589 conn->qinprog.c = NULL;
591 std::deque<QueueItem>::iterator j = conn->queue.begin();
592 while (j != conn->queue.end())
594 SQL::Query* q = j->c;
595 if (q->creator == mod)
599 j = conn->queue.erase(j);
607 Version GetVersion() CXX11_OVERRIDE
609 return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR);
613 bool ReconnectTimer::Tick(time_t time)
621 void SQLConn::DelayReconnect()
623 ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
624 ConnMap::iterator it = mod->connections.find(conf->getString("id"));
625 if (it != mod->connections.end())
627 mod->connections.erase(it);
628 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
631 mod->retimer = new ReconnectTimer(mod);
632 ServerInstance->Timers.AddTimer(mod->retimer);
637 MODULE_INIT(ModulePgSQL)