summaryrefslogtreecommitdiff
path: root/src/modules/extra
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/extra')
-rw-r--r--src/modules/extra/m_pgsql.cpp197
-rw-r--r--src/modules/extra/m_sqlv2.h73
2 files changed, 225 insertions, 45 deletions
diff --git a/src/modules/extra/m_pgsql.cpp b/src/modules/extra/m_pgsql.cpp
index e16f38e59..c44c66bc8 100644
--- a/src/modules/extra/m_pgsql.cpp
+++ b/src/modules/extra/m_pgsql.cpp
@@ -17,6 +17,7 @@
#include <sstream>
#include <string>
+#include <deque>
#include <map>
#include <libpq-fe.h>
@@ -56,14 +57,68 @@ typedef std::map<std::string, SQLConn*> ConnMap;
*/
enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE };
-class SQLerror
+inline std::string pop_front_r(std::deque<std::string> &d)
{
-public:
- std::string err;
+ std::string r = d.front();
+ d.pop_front();
+ return r;
+
+}
- SQLerror(const std::string &s)
- : err(s)
+/** QueryQueue, a queue of queries waiting to be executed.
+ * This maintains two queues internally, one for 'priority'
+ * queries and one for less important ones. Each queue has
+ * new queries appended to it and ones to execute are popped
+ * off the front. This keeps them flowing round nicely and no
+ * query should ever get 'stuck' for too long. If there are
+ * queries in the priority queue they will be executed first,
+ * 'unimportant' queries will only be executed when the
+ * priority queue is empty.
+ */
+
+class QueryQueue
+{
+private:
+ std::deque<std::string> priority; /* The priority queue */
+ std::deque<std::string> normal; /* The 'normal' queue */
+
+public:
+ QueryQueue()
+ {
+
+ }
+
+ void push_back(const std::string &q, bool pri = false)
+ {
+ log(DEBUG, "QueryQueue::push_back(): Adding %s query to queue: %s", ((pri) ? "priority" : "non-priority"), q.c_str());
+
+ if(pri)
+ priority.push_back(q);
+ else
+ normal.push_back(q);
+ }
+
+ inline std::string pop_front()
+ {
+ std::string res;
+
+ if(priority.size())
+ {
+ return pop_front_r(priority);
+ }
+ else if(normal.size())
+ {
+ return pop_front_r(normal);
+ }
+ else
+ {
+ return "";
+ }
+ }
+
+ std::pair<int, int> size()
{
+ return std::make_pair(priority.size(), normal.size());
}
};
@@ -88,6 +143,7 @@ private:
SQLstatus status; /* PgSQL database connection status */
public:
+ QueryQueue queue; /* Queue of queries waiting to be executed on this connection */
/* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */
@@ -131,8 +187,6 @@ public:
throw ModuleException("Connect failed");
}
}
-
- exit(-1);
}
~SQLConn()
@@ -171,8 +225,6 @@ public:
log(DEBUG, "No result for lookup yet!");
return true;
}
-
- exit(-1);
}
bool DoConnect()
@@ -249,6 +301,7 @@ public:
{
case PGRES_POLLING_WRITING:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING");
+ WantWrite();
status = CWRITE;
DoPoll();
break;
@@ -263,7 +316,6 @@ public:
case PGRES_POLLING_OK:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK");
status = WWRITE;
- Query("SELECT * FROM rawr");
break;
default:
log(DEBUG, "PGconnectPoll: wtf?");
@@ -273,6 +325,43 @@ public:
return true;
}
+ bool ProcessData()
+ {
+ if(PQconsumeInput(sql))
+ {
+ log(DEBUG, "PQconsumeInput succeeded");
+
+ if(PQisBusy(sql))
+ {
+ log(DEBUG, "Still busy processing command though");
+ }
+ else
+ {
+ log(DEBUG, "Looks like we have a result to process!");
+
+ while(PGresult* result = PQgetResult(sql))
+ {
+ int cols = PQnfields(result);
+
+ log(DEBUG, "Got result! :D");
+ log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols);
+
+ for(int i = 0; i < cols; i++)
+ {
+ log(DEBUG, "Column name: %s (%d)", PQfname(result, i));
+ }
+
+ PQclear(result);
+ }
+ }
+
+ return true;
+ }
+
+ log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql));
+ return false;
+ }
+
void ShowStatus()
{
switch(PQstatus(sql))
@@ -313,6 +402,14 @@ public:
return DoEvent();
}
+
+ virtual bool OnWriteReady()
+ {
+ /* Always return true here, false would close the socket - we need to do that ourselves with the pgsql API */
+ log(DEBUG, "OnWriteReady(): status = %s", StatusStr());
+
+ return DoEvent();
+ }
virtual bool OnConnected()
{
@@ -329,38 +426,21 @@ public:
}
else
{
- if(PQconsumeInput(sql))
- {
- log(DEBUG, "PQconsumeInput succeeded");
-
- if(PQisBusy(sql))
- {
- log(DEBUG, "Still busy processing command though");
- }
- else
- {
- log(DEBUG, "Looks like we have a result to process!");
-
- while(PGresult* result = PQgetResult(sql))
- {
- int cols = PQnfields(result);
-
- log(DEBUG, "Got result! :D");
- log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols);
-
- for(int i = 0; i < cols; i++)
- {
- log(DEBUG, "Column name: %s (%d)", PQfname(result, i));
- }
-
- PQclear(result);
- }
- }
- }
- else
- {
- log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql));
- }
+ ProcessData();
+ }
+
+ switch(PQflush(sql))
+ {
+ case -1:
+ log(DEBUG, "Error flushing write queue: %s", PQerrorMessage(sql));
+ break;
+ case 0:
+ log(DEBUG, "Successfully flushed write queue (or there was nothing to write)");
+ break;
+ case 1:
+ log(DEBUG, "Not all of the write queue written, triggering write event so we can have another go");
+ WantWrite();
+ break;
}
return true;
@@ -436,7 +516,6 @@ public:
{
/* Unused, I think */
}
-
};
class ModulePgSQL : public Module
@@ -451,13 +530,13 @@ public:
{
log(DEBUG, "%s 'SQL' feature", Srv->PublishFeature("SQL", this) ? "Published" : "Couldn't publish");
log(DEBUG, "%s 'PgSQL' feature", Srv->PublishFeature("PgSQL", this) ? "Published" : "Couldn't publish");
-
+
OnRehash("");
}
void Implements(char* List)
{
- List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1;
+ List[I_OnRequest] = List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1;
}
virtual void OnRehash(const std::string &parameter)
@@ -492,6 +571,34 @@ public:
connections.insert(std::make_pair(id, newconn));
}
}
+
+ virtual char* OnRequest(Request* request)
+ {
+ if(strcmp(SQLREQID, request->GetData()) == 0)
+ {
+ SQLrequest* req = (SQLrequest*)request;
+ ConnMap::iterator iter;
+
+ log(DEBUG, "Got query: '%s' on id '%s'", req->query.c_str(), req->dbid.c_str());
+
+ if((iter = connections.find(req->dbid)) != connections.end())
+ {
+ /* Execute query */
+ iter->second->queue.push_back(req->query, req->pri);
+
+ return SQLSUCCESS;
+ }
+ else
+ {
+ req->error.Id(BAD_DBID);
+ return NULL;
+ }
+ }
+
+ log(DEBUG, "Got unsupported API version string: %s", request->GetData());
+
+ return NULL;
+ }
virtual Version GetVersion()
{
diff --git a/src/modules/extra/m_sqlv2.h b/src/modules/extra/m_sqlv2.h
new file mode 100644
index 000000000..521a95640
--- /dev/null
+++ b/src/modules/extra/m_sqlv2.h
@@ -0,0 +1,73 @@
+#ifndef INSPIRCD_SQLAPI_2
+#define INSPIRCD_SQLAPI_2
+
+#define SQLREQID "SQLv2 Request"
+#define SQLRESID "SQLv2 Result"
+#define SQLSUCCESS "You shouldn't be reading this (success)"
+
+#include <string>
+#include "modules.h"
+
+enum SQLerrorNum { BAD_DBID };
+
+class SQLerror
+{
+ SQLerrorNum id;
+public:
+
+ SQLerror()
+ {
+ }
+
+ SQLerror(SQLerrorNum i)
+ : id(i)
+ {
+ }
+
+ void Id(SQLerrorNum i)
+ {
+ id = i;
+ }
+
+ const char* Str()
+ {
+ switch(id)
+ {
+ case BAD_DBID:
+ return "Invalid database ID";
+ default:
+ return "Unknown error";
+ }
+ }
+};
+
+class SQLrequest : public Request
+{
+public:
+ std::string query;
+ std::string dbid;
+ bool pri;
+ SQLerror error;
+
+ SQLrequest(Module* s, Module* d, const std::string &q, const std::string &id, bool p = false)
+ : Request(SQLREQID, s, d), query(q), dbid(id), pri(p)
+ {
+ }
+};
+
+class SQLresult : public Request
+{
+public:
+ SQLresult(Module* s, Module* d)
+ : Request(SQLRESID, s, d)
+ {
+
+ }
+
+ virtual int Rows()
+ {
+ return 0;
+ }
+};
+
+#endif