/* +------------------------------------+ * | Inspire Internet Relay Chat Daemon | * +------------------------------------+ * * InspIRCd: (C) 2002-2010 InspIRCd Development Team * See: http://wiki.inspircd.org/Credits * * This program is free but copyrighted software; see * the file COPYING for details. * * --------------------------------------------------- */ #include "inspircd.h" #include #include #include #include "sql.h" /* $ModDesc: PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API */ /* $CompileFlags: -Iexec("pg_config --includedir") eval("my $s = `pg_config --version`;$s =~ /^.*?(\d+)\.(\d+)\.(\d+).*?$/;my $v = hex(sprintf("0x%02x%02x%02x", $1, $2, $3));print "-DPGSQL_HAS_ESCAPECONN" if(($v >= 0x080104) || ($v >= 0x07030F && $v < 0x070400) || ($v >= 0x07040D && $v < 0x080000) || ($v >= 0x080008 && $v < 0x080100));") */ /* $LinkerFlags: -Lexec("pg_config --libdir") -lpq */ /* $ModDep: m_sqlv2.h */ /* SQLConn rewritten by peavey to * use EventHandler instead of * BufferedSocket. This is much neater * and gives total control of destroy * and delete of resources. */ /* Forward declare, so we can have the typedef neatly at the top */ class SQLConn; class ModulePgSQL; typedef std::map ConnMap; /* CREAD, Connecting and wants read event * CWRITE, Connecting and wants write event * WREAD, Connected/Working and wants read event * WWRITE, Connected/Working and wants write event * RREAD, Resetting and wants read event * RWRITE, Resetting and wants write event */ enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE, RREAD, RWRITE }; class ReconnectTimer : public Timer { private: ModulePgSQL* mod; public: ReconnectTimer(ModulePgSQL* m) : Timer(5, ServerInstance->Time(), false), mod(m) { } virtual void Tick(time_t TIME); }; /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult. * All SQL providers must create their own subclass and define it's methods using that * database library's data retriveal functions. The aim is to avoid a slow and inefficient process * of converting all data to a common format before it reaches the result structure. This way * data is passes to the module nearly as directly as if it was using the API directly itself. */ class PgSQLresult : public SQLResult { PGresult* res; int currentrow; int rows; public: PgSQLresult(PGresult* result) : res(result), currentrow(0) { rows = PQntuples(res); if (!rows) rows = atoi(PQcmdTuples(res)); } ~PgSQLresult() { PQclear(res); } virtual int Rows() { return rows; } virtual void GetCols(std::vector& result) { result.resize(PQnfields(res)); for(unsigned int i=0; i < result.size(); i++) { result[i] = PQfname(res, i); } } virtual SQLEntry GetValue(int row, int column) { char* v = PQgetvalue(res, row, column); if (!v || PQgetisnull(res, row, column)) return SQLEntry(); return SQLEntry(std::string(v, PQgetlength(res, row, column))); } virtual bool GetRow(SQLEntries& result) { if (currentrow >= PQntuples(res)) return false; int ncols = PQnfields(res); for(int i = 0; i < ncols; i++) { result.push_back(GetValue(currentrow, i)); } currentrow++; return true; } }; /** SQLConn represents one SQL session. */ class SQLConn : public SQLProvider, public EventHandler { private: reference conf; /* The entry */ std::deque queue; PGconn* sql; /* PgSQL database connection handle */ SQLstatus status; /* PgSQL database connection status */ SQLQuery* qinprog; /* If there is currently a query in progress */ time_t idle; /* Time we last heard from the database */ public: SQLConn(Module* Creator, ConfigTag* tag) : SQLProvider(Creator, "SQL/" + tag->getString("id")), conf(tag), sql(NULL), status(CWRITE), qinprog(NULL) { idle = ServerInstance->Time(); if (!DoConnect()) { ServerInstance->Logs->Log("m_pgsql",DEFAULT, "WARNING: Could not connect to database " + tag->getString("id")); DelayReconnect(); } } CullResult cull() { this->SQLProvider::cull(); ServerInstance->Modules->DelService(*this); return this->EventHandler::cull(); } ~SQLConn() { SQLerror err(SQL_BAD_DBID); if (qinprog) { qinprog->OnError(err); delete qinprog; } for(std::deque::iterator i = queue.begin(); i != queue.end(); i++) { SQLQuery* q = *i; q->OnError(err); delete q; } } virtual void HandleEvent(EventType et, int errornum) { switch (et) { case EVENT_READ: case EVENT_WRITE: DoEvent(); break; case EVENT_ERROR: DelayReconnect(); } } std::string GetDSN() { std::ostringstream conninfo("connect_timeout = '5'"); std::string item; if (conf->readString("host", item)) conninfo << " host = '" << item << "'"; if (conf->readString("port", item)) conninfo << " port = '" << item << "'"; if (conf->readString("name", item)) conninfo << " dbname = '" << item << "'"; if (conf->readString("user", item)) conninfo << " user = '" << item << "'"; if (conf->readString("pass", item)) conninfo << " password = '" << item << "'"; if (conf->getBool("ssl")) conninfo << " sslmode = 'require'"; else conninfo << " sslmode = 'disable'"; return conninfo.str(); } bool DoConnect() { sql = PQconnectStart(GetDSN().c_str()); if (!sql) return false; if(PQstatus(sql) == CONNECTION_BAD) return false; if(PQsetnonblocking(sql, 1) == -1) return false; /* OK, we've initalised the connection, now to get it hooked into the socket engine * and then start polling it. */ this->fd = PQsocket(sql); if(this->fd <= -1) return false; if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ)) { ServerInstance->Logs->Log("m_pgsql",DEBUG, "BUG: Couldn't add pgsql socket to socket engine"); return false; } /* Socket all hooked into the engine, now to tell PgSQL to start connecting */ return DoPoll(); } bool DoPoll() { switch(PQconnectPoll(sql)) { case PGRES_POLLING_WRITING: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ); status = CWRITE; return true; case PGRES_POLLING_READING: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); status = CREAD; return true; case PGRES_POLLING_FAILED: return false; case PGRES_POLLING_OK: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); status = WWRITE; DoConnectedPoll(); default: return true; } } void DoConnectedPoll() { restart: while (!qinprog && !queue.empty()) { /* There's no query currently in progress, and there's queries in the queue. */ DoQuery(queue.front()); queue.pop_front(); } if (PQconsumeInput(sql)) { /* We just read stuff from the server, that counts as it being alive * so update the idle-since time :p */ idle = ServerInstance->Time(); if (PQisBusy(sql)) { /* Nothing happens here */ } else if (qinprog) { /* Fetch the result.. */ PGresult* result = PQgetResult(sql); /* PgSQL would allow a query string to be sent which has multiple * queries in it, this isn't portable across database backends and * we don't want modules doing it. But just in case we make sure we * drain any results there are and just use the last one. * If the module devs are behaving there will only be one result. */ while (PGresult* temp = PQgetResult(sql)) { PQclear(result); result = temp; } /* ..and the result */ PgSQLresult reply(result); switch(PQresultStatus(result)) { case PGRES_EMPTY_QUERY: case PGRES_BAD_RESPONSE: case PGRES_FATAL_ERROR: { SQLerror err(SQL_QREPLY_FAIL, PQresultErrorMessage(result)); qinprog->OnError(err); break; } default: /* Other values are not errors */ qinprog->OnResult(reply); } delete qinprog; qinprog = NULL; goto restart; } } else { /* I think we'll assume this means the server died...it might not, * but I think that any error serious enough we actually get here * deserves to reconnect [/excuse] * Returning true so the core doesn't try and close the connection. */ DelayReconnect(); } } bool DoResetPoll() { switch(PQresetPoll(sql)) { case PGRES_POLLING_WRITING: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ); status = CWRITE; return DoPoll(); case PGRES_POLLING_READING: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); status = CREAD; return true; case PGRES_POLLING_FAILED: return false; case PGRES_POLLING_OK: ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); status = WWRITE; DoConnectedPoll(); default: return true; } } void DelayReconnect(); void DoEvent() { if((status == CREAD) || (status == CWRITE)) { DoPoll(); } else if((status == RREAD) || (status == RWRITE)) { DoResetPoll(); } else { DoConnectedPoll(); } } virtual std::string FormatQuery(const std::string& q, const ParamL& p) { std::string res; unsigned int param = 0; for(std::string::size_type i = 0; i < q.length(); i++) { if (q[i] != '?') res.push_back(q[i]); else { // TODO numbered parameter support ('?1') if (param < p.size()) { std::string parm = p[param++]; char buffer[MAXBUF]; #ifdef PGSQL_HAS_ESCAPECONN int error; PQescapeStringConn(sql, buffer, parm.c_str(), parm.length(), &error); if (error) ServerInstance->Logs->Log("m_pgsql", DEBUG, "BUG: Apparently PQescapeStringConn() failed"); #else PQescapeString (buffer, parm.c_str(), parm.length()); #endif res.append(buffer); } } } return res; } std::string FormatQuery(const std::string& q, const ParamM& p) { std::string res; for(std::string::size_type i = 0; i < q.length(); i++) { if (q[i] != '$') res.push_back(q[i]); else { std::string field; i++; while (i < q.length() && isalpha(q[i])) field.push_back(q[i++]); i--; ParamM::const_iterator it = p.find(field); if (it != p.end()) { std::string parm = it->second; char buffer[MAXBUF]; #ifdef PGSQL_HAS_ESCAPECONN int error; PQescapeStringConn(sql, buffer, parm.c_str(), parm.length(), &error); if (error) ServerInstance->Logs->Log("m_pgsql", DEBUG, "BUG: Apparently PQescapeStringConn() failed"); #else PQescapeString (buffer, parm.c_str(), parm.length()); #endif res.append(buffer); } } } return res; } virtual void submit(SQLQuery *req) { if (qinprog) { // wait your turn. queue.push_back(req); } else { DoQuery(req); } } void DoQuery(SQLQuery* req) { if (status != WREAD && status != WWRITE) { // whoops, not connected... SQLerror err(SQL_BAD_CONN); req->OnError(err); delete req; return; } if(PQsendQuery(sql, req->query.c_str())) { qinprog = req; } else { SQLerror err(SQL_QSEND_FAIL, PQerrorMessage(sql)); req->OnError(err); delete req; } } void Close() { ServerInstance->SE->DelFd(this); if(sql) { PQfinish(sql); sql = NULL; } } }; class ModulePgSQL : public Module { public: ConnMap connections; ReconnectTimer* retimer; ModulePgSQL() { } void init() { ReadConf(); Implementation eventlist[] = { I_OnUnloadModule, I_OnRehash }; ServerInstance->Modules->Attach(eventlist, this, 2); } virtual ~ModulePgSQL() { if (retimer) ServerInstance->Timers->DelTimer(retimer); ClearAllConnections(); } virtual void OnRehash(User* user) { ReadConf(); } void ReadConf() { ConnMap conns; ConfigTagList tags = ServerInstance->Config->ConfTags("database"); for(ConfigIter i = tags.first; i != tags.second; i++) { if (i->second->getString("module", "pgsql") != "pgsql") continue; std::string id = i->second->getString("id"); ConnMap::iterator curr = connections.find(id); if (curr == connections.end()) { SQLConn* conn = new SQLConn(this, i->second); conns.insert(std::make_pair(id, conn)); ServerInstance->Modules->AddService(*conn); } else { conns.insert(*curr); connections.erase(curr); } } ClearAllConnections(); conns.swap(connections); } void ClearAllConnections() { for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++) { i->second->cull(); delete i->second; } connections.clear(); } void OnUnloadModule(Module* mod) { // TODO cancel queries that will have a bad vtable } Version GetVersion() { return Version("PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API", VF_VENDOR); } }; void ReconnectTimer::Tick(time_t time) { mod->retimer = NULL; mod->ReadConf(); } void SQLConn::DelayReconnect() { ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator; ConnMap::iterator it = mod->connections.find(conf->getString("id")); if (it != mod->connections.end()) { mod->connections.erase(it); ServerInstance->GlobalCulls.AddItem((EventHandler*)this); if (!mod->retimer) { mod->retimer = new ReconnectTimer(mod); ServerInstance->Timers->AddTimer(mod->retimer); } } } MODULE_INIT(ModulePgSQL)