diff options
Diffstat (limited to 'src/modules/extra')
-rw-r--r-- | src/modules/extra/m_pgsql.cpp | 197 | ||||
-rw-r--r-- | src/modules/extra/m_sqlv2.h | 73 |
2 files changed, 225 insertions, 45 deletions
diff --git a/src/modules/extra/m_pgsql.cpp b/src/modules/extra/m_pgsql.cpp index e16f38e59..c44c66bc8 100644 --- a/src/modules/extra/m_pgsql.cpp +++ b/src/modules/extra/m_pgsql.cpp @@ -17,6 +17,7 @@ #include <sstream> #include <string> +#include <deque> #include <map> #include <libpq-fe.h> @@ -56,14 +57,68 @@ typedef std::map<std::string, SQLConn*> ConnMap; */ enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE }; -class SQLerror +inline std::string pop_front_r(std::deque<std::string> &d) { -public: - std::string err; + std::string r = d.front(); + d.pop_front(); + return r; + +} - SQLerror(const std::string &s) - : err(s) +/** QueryQueue, a queue of queries waiting to be executed. + * This maintains two queues internally, one for 'priority' + * queries and one for less important ones. Each queue has + * new queries appended to it and ones to execute are popped + * off the front. This keeps them flowing round nicely and no + * query should ever get 'stuck' for too long. If there are + * queries in the priority queue they will be executed first, + * 'unimportant' queries will only be executed when the + * priority queue is empty. + */ + +class QueryQueue +{ +private: + std::deque<std::string> priority; /* The priority queue */ + std::deque<std::string> normal; /* The 'normal' queue */ + +public: + QueryQueue() + { + + } + + void push_back(const std::string &q, bool pri = false) + { + log(DEBUG, "QueryQueue::push_back(): Adding %s query to queue: %s", ((pri) ? "priority" : "non-priority"), q.c_str()); + + if(pri) + priority.push_back(q); + else + normal.push_back(q); + } + + inline std::string pop_front() + { + std::string res; + + if(priority.size()) + { + return pop_front_r(priority); + } + else if(normal.size()) + { + return pop_front_r(normal); + } + else + { + return ""; + } + } + + std::pair<int, int> size() { + return std::make_pair(priority.size(), normal.size()); } }; @@ -88,6 +143,7 @@ private: SQLstatus status; /* PgSQL database connection status */ public: + QueryQueue queue; /* Queue of queries waiting to be executed on this connection */ /* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */ @@ -131,8 +187,6 @@ public: throw ModuleException("Connect failed"); } } - - exit(-1); } ~SQLConn() @@ -171,8 +225,6 @@ public: log(DEBUG, "No result for lookup yet!"); return true; } - - exit(-1); } bool DoConnect() @@ -249,6 +301,7 @@ public: { case PGRES_POLLING_WRITING: log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING"); + WantWrite(); status = CWRITE; DoPoll(); break; @@ -263,7 +316,6 @@ public: case PGRES_POLLING_OK: log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK"); status = WWRITE; - Query("SELECT * FROM rawr"); break; default: log(DEBUG, "PGconnectPoll: wtf?"); @@ -273,6 +325,43 @@ public: return true; } + bool ProcessData() + { + if(PQconsumeInput(sql)) + { + log(DEBUG, "PQconsumeInput succeeded"); + + if(PQisBusy(sql)) + { + log(DEBUG, "Still busy processing command though"); + } + else + { + log(DEBUG, "Looks like we have a result to process!"); + + while(PGresult* result = PQgetResult(sql)) + { + int cols = PQnfields(result); + + log(DEBUG, "Got result! :D"); + log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols); + + for(int i = 0; i < cols; i++) + { + log(DEBUG, "Column name: %s (%d)", PQfname(result, i)); + } + + PQclear(result); + } + } + + return true; + } + + log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql)); + return false; + } + void ShowStatus() { switch(PQstatus(sql)) @@ -313,6 +402,14 @@ public: return DoEvent(); } + + virtual bool OnWriteReady() + { + /* Always return true here, false would close the socket - we need to do that ourselves with the pgsql API */ + log(DEBUG, "OnWriteReady(): status = %s", StatusStr()); + + return DoEvent(); + } virtual bool OnConnected() { @@ -329,38 +426,21 @@ public: } else { - if(PQconsumeInput(sql)) - { - log(DEBUG, "PQconsumeInput succeeded"); - - if(PQisBusy(sql)) - { - log(DEBUG, "Still busy processing command though"); - } - else - { - log(DEBUG, "Looks like we have a result to process!"); - - while(PGresult* result = PQgetResult(sql)) - { - int cols = PQnfields(result); - - log(DEBUG, "Got result! :D"); - log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols); - - for(int i = 0; i < cols; i++) - { - log(DEBUG, "Column name: %s (%d)", PQfname(result, i)); - } - - PQclear(result); - } - } - } - else - { - log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql)); - } + ProcessData(); + } + + switch(PQflush(sql)) + { + case -1: + log(DEBUG, "Error flushing write queue: %s", PQerrorMessage(sql)); + break; + case 0: + log(DEBUG, "Successfully flushed write queue (or there was nothing to write)"); + break; + case 1: + log(DEBUG, "Not all of the write queue written, triggering write event so we can have another go"); + WantWrite(); + break; } return true; @@ -436,7 +516,6 @@ public: { /* Unused, I think */ } - }; class ModulePgSQL : public Module @@ -451,13 +530,13 @@ public: { log(DEBUG, "%s 'SQL' feature", Srv->PublishFeature("SQL", this) ? "Published" : "Couldn't publish"); log(DEBUG, "%s 'PgSQL' feature", Srv->PublishFeature("PgSQL", this) ? "Published" : "Couldn't publish"); - + OnRehash(""); } void Implements(char* List) { - List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1; + List[I_OnRequest] = List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1; } virtual void OnRehash(const std::string ¶meter) @@ -492,6 +571,34 @@ public: connections.insert(std::make_pair(id, newconn)); } } + + virtual char* OnRequest(Request* request) + { + if(strcmp(SQLREQID, request->GetData()) == 0) + { + SQLrequest* req = (SQLrequest*)request; + ConnMap::iterator iter; + + log(DEBUG, "Got query: '%s' on id '%s'", req->query.c_str(), req->dbid.c_str()); + + if((iter = connections.find(req->dbid)) != connections.end()) + { + /* Execute query */ + iter->second->queue.push_back(req->query, req->pri); + + return SQLSUCCESS; + } + else + { + req->error.Id(BAD_DBID); + return NULL; + } + } + + log(DEBUG, "Got unsupported API version string: %s", request->GetData()); + + return NULL; + } virtual Version GetVersion() { diff --git a/src/modules/extra/m_sqlv2.h b/src/modules/extra/m_sqlv2.h new file mode 100644 index 000000000..521a95640 --- /dev/null +++ b/src/modules/extra/m_sqlv2.h @@ -0,0 +1,73 @@ +#ifndef INSPIRCD_SQLAPI_2 +#define INSPIRCD_SQLAPI_2 + +#define SQLREQID "SQLv2 Request" +#define SQLRESID "SQLv2 Result" +#define SQLSUCCESS "You shouldn't be reading this (success)" + +#include <string> +#include "modules.h" + +enum SQLerrorNum { BAD_DBID }; + +class SQLerror +{ + SQLerrorNum id; +public: + + SQLerror() + { + } + + SQLerror(SQLerrorNum i) + : id(i) + { + } + + void Id(SQLerrorNum i) + { + id = i; + } + + const char* Str() + { + switch(id) + { + case BAD_DBID: + return "Invalid database ID"; + default: + return "Unknown error"; + } + } +}; + +class SQLrequest : public Request +{ +public: + std::string query; + std::string dbid; + bool pri; + SQLerror error; + + SQLrequest(Module* s, Module* d, const std::string &q, const std::string &id, bool p = false) + : Request(SQLREQID, s, d), query(q), dbid(id), pri(p) + { + } +}; + +class SQLresult : public Request +{ +public: + SQLresult(Module* s, Module* d) + : Request(SQLRESID, s, d) + { + + } + + virtual int Rows() + { + return 0; + } +}; + +#endif |