-/* +------------------------------------+
- * | Inspire Internet Relay Chat Daemon |
- * +------------------------------------+
+/*
+ * InspIRCd -- Internet Relay Chat Daemon
*
- * InspIRCd: (C) 2002-2010 InspIRCd Development Team
- * See: http://wiki.inspircd.org/Credits
+ * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
+ * Copyright (C) 2006-2007, 2009 Dennis Friis <peavey@inspircd.org>
+ * Copyright (C) 2006-2009 Craig Edwards <craigedwards@brainbox.cc>
+ * Copyright (C) 2008 Robin Burchell <robin+git@viroteck.net>
*
- * This program is free but copyrighted software; see
- * the file COPYING for details.
+ * This file is part of InspIRCd. InspIRCd is free software: you can
+ * redistribute it and/or modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation, version 2.
*
- * ---------------------------------------------------
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+
/* Stop mysql wanting to use long long */
#define NO_CLIENT_LONG_LONG
#include <mysql.h>
#include "sql.h"
-#ifdef WINDOWS
-#pragma comment(lib, "mysqlclient.lib")
+#ifdef _WIN32
+# pragma comment(lib, "mysqlclient.lib")
+# pragma comment(lib, "advapi32.lib")
+# pragma comment(linker, "/NODEFAULTLIB:LIBCMT")
#endif
/* VERSION 3 API: With nonblocking (threaded) requests */
/* $ModDesc: SQL Service Provider module for all other m_sql* modules */
/* $CompileFlags: exec("mysql_config --include") */
/* $LinkerFlags: exec("mysql_config --libs_r") rpath("mysql_config --libs_r") */
-/* $ModDep: m_sqlv2.h */
/* THE NONBLOCKING MYSQL API!
*
class MySQLresult;
class DispatcherThread;
-struct QueueItem
+struct QQueueItem
{
SQLQuery* q;
+ std::string query;
SQLConnection* c;
- QueueItem(SQLQuery* Q, SQLConnection* C) : q(Q), c(C) {}
+ QQueueItem(SQLQuery* Q, const std::string& S, SQLConnection* C) : q(Q), query(S), c(C) {}
+};
+
+struct RQueueItem
+{
+ SQLQuery* q;
+ MySQLresult* r;
+ RQueueItem(SQLQuery* Q, MySQLresult* R) : q(Q), r(R) {}
};
typedef std::map<std::string, SQLConnection*> ConnMap;
-typedef std::deque<QueueItem> QueryQueue;
-typedef std::deque<MySQLresult*> ResultQueue;
+typedef std::deque<QQueueItem> QueryQueue;
+typedef std::deque<RQueueItem> ResultQueue;
/** MySQL module
* */
{
public:
DispatcherThread* Dispatcher;
- QueryQueue qq;
- ResultQueue rq;
- ConnMap connections;
+ QueryQueue qq; // MUST HOLD MUTEX
+ ResultQueue rq; // MUST HOLD MUTEX
+ ConnMap connections; // main thread only
ModuleSQL();
void init();
~ModuleSQL();
void OnRehash(User* user);
+ void OnUnloadModule(Module* mod);
Version GetVersion();
};
class MySQLresult : public SQLResult
{
public:
- SQLQuery* query;
SQLerror err;
int currentrow;
int rows;
std::vector<std::string> colnames;
std::vector<SQLEntries> fieldlists;
- MySQLresult(SQLQuery* q, MYSQL_RES* res, int affected_rows) : query(q), err(SQL_NO_ERROR), currentrow(0), rows(0)
+ MySQLresult(MYSQL_RES* res, int affected_rows) : err(SQL_NO_ERROR), currentrow(0), rows(0)
{
if (affected_rows >= 1)
{
rows++;
}
mysql_free_result(res);
- res = NULL;
}
}
- MySQLresult(SQLQuery* q, SQLerror& e) : query(q), err(e)
+ MySQLresult(SQLerror& e) : err(e)
{
}
public:
reference<ConfigTag> config;
MYSQL *connection;
- bool active;
+ Mutex lock;
// This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
SQLConnection(Module* p, ConfigTag* tag) : SQLProvider(p, "SQL/" + tag->getString("id")),
- config(tag), active(false)
+ config(tag), connection(NULL)
{
}
return true;
}
- virtual std::string FormatQuery(const std::string& q, const ParamL& p)
+ ModuleSQL* Parent()
+ {
+ return (ModuleSQL*)(Module*)creator;
+ }
+
+ MySQLresult* DoBlockingQuery(const std::string& query)
+ {
+
+ /* Parse the command string and dispatch it to mysql */
+ if (CheckConnection() && !mysql_real_query(connection, query.data(), query.length()))
+ {
+ /* Successfull query */
+ MYSQL_RES* res = mysql_use_result(connection);
+ unsigned long rows = mysql_affected_rows(connection);
+ return new MySQLresult(res, rows);
+ }
+ else
+ {
+ /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
+ * possible error numbers and error messages */
+ SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + ": " + mysql_error(connection));
+ return new MySQLresult(e);
+ }
+ }
+
+ bool CheckConnection()
+ {
+ if (!connection || mysql_ping(connection) != 0)
+ return Connect();
+ return true;
+ }
+
+ std::string GetError()
+ {
+ return mysql_error(connection);
+ }
+
+ void Close()
+ {
+ mysql_close(connection);
+ }
+
+ void submit(SQLQuery* q, const std::string& qs)
+ {
+ Parent()->Dispatcher->LockQueue();
+ Parent()->qq.push_back(QQueueItem(q, qs, this));
+ Parent()->Dispatcher->UnlockQueueWakeup();
+ }
+
+ void submit(SQLQuery* call, const std::string& q, const ParamL& p)
{
std::string res;
unsigned int param = 0;
res.push_back(q[i]);
else
{
- // TODO numbered parameter support ('?1')
if (param < p.size())
{
std::string parm = p[param++];
- char buffer[MAXBUF];
- mysql_escape_string(buffer, parm.c_str(), parm.length());
+ // In the worst case, each character may need to be encoded as using two bytes,
+ // and one byte is the terminating null
+ std::vector<char> buffer(parm.length() * 2 + 1);
+
+ // The return value of mysql_escape_string() is the length of the encoded string,
+ // not including the terminating null
+ unsigned long escapedsize = mysql_escape_string(&buffer[0], parm.c_str(), parm.length());
// mysql_real_escape_string(connection, queryend, paramscopy[paramnum].c_str(), paramscopy[paramnum].length());
- res.append(buffer);
+ res.append(&buffer[0], escapedsize);
}
}
}
- return res;
+ submit(call, res);
}
- std::string FormatQuery(const std::string& q, const ParamM& p)
+ void submit(SQLQuery* call, const std::string& q, const ParamM& p)
{
std::string res;
for(std::string::size_type i = 0; i < q.length(); i++)
{
std::string field;
i++;
- while (i < q.length() && isalpha(q[i]))
+ while (i < q.length() && isalnum(q[i]))
field.push_back(q[i++]);
i--;
if (it != p.end())
{
std::string parm = it->second;
- char buffer[MAXBUF];
- mysql_escape_string(buffer, parm.c_str(), parm.length());
- res.append(buffer);
+ // NOTE: See above
+ std::vector<char> buffer(parm.length() * 2 + 1);
+ unsigned long escapedsize = mysql_escape_string(&buffer[0], parm.c_str(), parm.length());
+ res.append(&buffer[0], escapedsize);
}
}
}
- return res;
- }
-
- ModuleSQL* Parent()
- {
- return (ModuleSQL*)(Module*)creator;
- }
-
- void DoBlockingQuery(SQLQuery* req)
- {
- /* Parse the command string and dispatch it to mysql */
- if (CheckConnection() && !mysql_real_query(connection, req->query.data(), req->query.length()))
- {
- /* Successfull query */
- MYSQL_RES* res = mysql_use_result(connection);
- unsigned long rows = mysql_affected_rows(connection);
- MySQLresult* r = new MySQLresult(req, res, rows);
- Parent()->Dispatcher->LockQueue();
- Parent()->rq.push_back(r);
- Parent()->Dispatcher->NotifyParent();
- Parent()->Dispatcher->UnlockQueue();
- }
- else
- {
- /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
- * possible error numbers and error messages */
- SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + std::string(": ") + mysql_error(connection));
- MySQLresult* r = new MySQLresult(req, e);
- Parent()->Dispatcher->LockQueue();
- Parent()->rq.push_back(r);
- Parent()->Dispatcher->NotifyParent();
- Parent()->Dispatcher->UnlockQueue();
- }
- }
-
- bool CheckConnection()
- {
- if (mysql_ping(connection) != 0)
- {
- return Connect();
- }
- else return true;
- }
-
- std::string GetError()
- {
- return mysql_error(connection);
- }
-
- void Close()
- {
- mysql_close(connection);
- }
-
- void submit(SQLQuery* q)
- {
- Parent()->Dispatcher->LockQueue();
- Parent()->qq.push_back(QueueItem(q, this));
- Parent()->Dispatcher->UnlockQueueWakeup();
+ submit(call, res);
}
};
Dispatcher = new DispatcherThread(this);
ServerInstance->Threads->Start(Dispatcher);
- Implementation eventlist[] = { I_OnRehash };
- ServerInstance->Modules->Attach(eventlist, this, 1);
+ Implementation eventlist[] = { I_OnRehash, I_OnUnloadModule };
+ ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation));
+
+ OnRehash(NULL);
}
ModuleSQL::~ModuleSQL()
void ModuleSQL::OnRehash(User* user)
{
- Dispatcher->LockQueue();
ConnMap conns;
ConfigTagList tags = ServerInstance->Config->ConfTags("database");
for(ConfigIter i = tags.first; i != tags.second; i++)
connections.erase(curr);
}
}
+
+ // now clean up the deleted databases
+ Dispatcher->LockQueue();
+ SQLerror err(SQL_BAD_DBID);
for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
{
- if (i->second->active)
+ ServerInstance->Modules->DelService(*i->second);
+ // it might be running a query on this database. Wait for that to complete
+ i->second->lock.Lock();
+ i->second->lock.Unlock();
+ // now remove all active queries to this DB
+ for (size_t j = qq.size(); j > 0; j--)
{
- // can't delete it now. Next rehash will try to kill it again
- conns.insert(*i);
+ size_t k = j - 1;
+ if (qq[k].c == i->second)
+ {
+ qq[k].q->OnError(err);
+ delete qq[k].q;
+ qq.erase(qq.begin() + k);
+ }
}
- else
+ // finally, nuke the connection
+ delete i->second;
+ }
+ Dispatcher->UnlockQueue();
+ connections.swap(conns);
+}
+
+void ModuleSQL::OnUnloadModule(Module* mod)
+{
+ SQLerror err(SQL_BAD_DBID);
+ Dispatcher->LockQueue();
+ unsigned int i = qq.size();
+ while (i > 0)
+ {
+ i--;
+ if (qq[i].q->creator == mod)
{
- ServerInstance->Modules->DelService(*i->second);
- delete i->second;
+ if (i == 0)
+ {
+ // need to wait until the query is done
+ // (the result will be discarded)
+ qq[i].c->lock.Lock();
+ qq[i].c->lock.Unlock();
+ }
+ qq[i].q->OnError(err);
+ delete qq[i].q;
+ qq.erase(qq.begin() + i);
}
}
- connections.swap(conns);
Dispatcher->UnlockQueue();
+ // clean up any result queue entries
+ Dispatcher->OnNotify();
}
Version ModuleSQL::GetVersion()
{
if (!Parent->qq.empty())
{
- QueueItem i = Parent->qq.front();
- Parent->qq.pop_front();
- i.c->active = true;
+ QQueueItem i = Parent->qq.front();
+ i.c->lock.Lock();
this->UnlockQueue();
- i.c->DoBlockingQuery(i.q);
+ MySQLresult* res = i.c->DoBlockingQuery(i.query);
+ i.c->lock.Unlock();
+
+ /*
+ * At this point, the main thread could be working on:
+ * Rehash - delete i.c out from under us. We don't care about that.
+ * UnloadModule - delete i.q and the qq item. Need to avoid reporting results.
+ */
+
this->LockQueue();
- i.c->active = false;
+ if (!Parent->qq.empty() && Parent->qq.front().q == i.q)
+ {
+ Parent->qq.pop_front();
+ Parent->rq.push_back(RQueueItem(i.q, res));
+ NotifyParent();
+ }
+ else
+ {
+ // UnloadModule ate the query
+ delete res;
+ }
}
else
{
this->LockQueue();
for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
{
- MySQLresult* res = *i;
+ MySQLresult* res = i->r;
if (res->err.id == SQL_NO_ERROR)
- res->query->OnResult(*res);
+ i->q->OnResult(*res);
else
- res->query->OnError(res->err);
- delete res->query;
- delete res;
+ i->q->OnError(res->err);
+ delete i->q;
+ delete i->r;
}
Parent->rq.clear();
this->UnlockQueue();