]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_pgsql.cpp
b62c804ce8c0291265844dcd0c3b718d2ad8d595
[user/henk/code/inspircd.git] / src / modules / extra / m_pgsql.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd is copyright (C) 2002-2004 ChatSpike-Dev.
6  *                       E-mail:
7  *                <brain@chatspike.net>
8  *                <Craig@chatspike.net>
9  *                <omster@gmail.com>
10  *     
11  * Written by Craig Edwards, Craig McLure, and others.
12  * This program is free but copyrighted software; see
13  *            the file COPYING for details.
14  *
15  * ---------------------------------------------------
16  */
17
18 #include <sstream>
19 #include <string>
20 #include <deque>
21 #include <map>
22 #include <libpq-fe.h>
23
24 #include "users.h"
25 #include "channels.h"
26 #include "modules.h"
27 #include "helperfuncs.h"
28 #include "inspircd.h"
29 #include "configreader.h"
30
31 #include "m_sqlv2.h"
32
33 /* $ModDesc: PostgreSQL Service Provider module for all other m_sql* modules, uses v2 of the SQL API */
34 /* $CompileFlags: -I`pg_config --includedir` `perl extra/pgsql_config.pl` */
35 /* $LinkerFlags: -L`pg_config --libdir` -lpq */
36
37 /* UGH, UGH, UGH, UGH, UGH, UGH
38  * I'm having trouble seeing how I
39  * can avoid this. The core-defined
40  * constructors for InspSocket just
41  * aren't suitable...and if I'm
42  * reimplementing them I need this so
43  * I can access the socket engine :\
44  */
45 extern InspIRCd* ServerInstance;
46 InspSocket* socket_ref[MAX_DESCRIPTORS];
47
48 /* Forward declare, so we can have the typedef neatly at the top */
49 class SQLConn;
50 /* Also needs forward declaration, as it's used inside SQLconn */
51 class ModulePgSQL;
52
53 typedef std::map<std::string, SQLConn*> ConnMap;
54
55 /* CREAD,       Connecting and wants read event
56  * CWRITE,      Connecting and wants write event
57  * WREAD,       Connected/Working and wants read event
58  * WWRITE,      Connected/Working and wants write event
59  */
60 enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE };
61
62 /** QueryQueue, a queue of queries waiting to be executed.
63  * This maintains two queues internally, one for 'priority'
64  * queries and one for less important ones. Each queue has
65  * new queries appended to it and ones to execute are popped
66  * off the front. This keeps them flowing round nicely and no
67  * query should ever get 'stuck' for too long. If there are
68  * queries in the priority queue they will be executed first,
69  * 'unimportant' queries will only be executed when the
70  * priority queue is empty.
71  *
72  * We store lists of SQLrequest's here, by value as we want to avoid storing
73  * any data allocated inside the client module (in case that module is unloaded
74  * while the query is in progress).
75  *
76  * Because we want to work on the current SQLrequest in-situ, we need a way
77  * of accessing the request we are currently processing, QueryQueue::front(),
78  * but that call needs to always return the same request until that request
79  * is removed from the queue, this is what the 'which' variable is. New queries are
80  * always added to the back of one of the two queues, but if when front()
81  * is first called then the priority queue is empty then front() will return
82  * a query from the normal queue, but if a query is then added to the priority
83  * queue then front() must continue to return the front of the *normal* queue
84  * until pop() is called.
85  */
86
87 class QueryQueue : public classbase
88 {
89 private:
90         typedef std::deque<SQLrequest> ReqDeque;        
91
92         ReqDeque priority;      /* The priority queue */
93         ReqDeque normal;        /* The 'normal' queue */
94         enum { PRI, NOR, NON } which;   /* Which queue the currently active element is at the front of */
95
96 public:
97         QueryQueue()
98         : which(NON)
99         {
100         }
101         
102         void push(const SQLrequest &q)
103         {
104                 log(DEBUG, "QueryQueue::push(): Adding %s query to queue: %s", ((q.pri) ? "priority" : "non-priority"), q.query.q.c_str());
105                 
106                 if(q.pri)
107                         priority.push_back(q);
108                 else
109                         normal.push_back(q);
110         }
111         
112         void pop()
113         {
114                 if((which == PRI) && priority.size())
115                 {
116                         priority.pop_front();
117                 }
118                 else if((which == NOR) && normal.size())
119                 {
120                         normal.pop_front();
121                 }
122                 
123                 /* Reset this */
124                 which = NON;
125                 
126                 /* Silently do nothing if there was no element to pop() */
127         }
128         
129         SQLrequest& front()
130         {
131                 switch(which)
132                 {
133                         case PRI:
134                                 return priority.front();
135                         case NOR:
136                                 return normal.front();
137                         default:
138                                 if(priority.size())
139                                 {
140                                         which = PRI;
141                                         return priority.front();
142                                 }
143                                 
144                                 if(normal.size())
145                                 {
146                                         which = NOR;
147                                         return normal.front();
148                                 }
149                                 
150                                 /* This will probably result in a segfault,
151                                  * but the caller should have checked totalsize()
152                                  * first so..meh - moron :p
153                                  */
154                                 
155                                 return priority.front();
156                 }
157         }
158         
159         std::pair<int, int> size()
160         {
161                 return std::make_pair(priority.size(), normal.size());
162         }
163         
164         int totalsize()
165         {
166                 return priority.size() + normal.size();
167         }
168         
169         void PurgeModule(Module* mod)
170         {
171                 DoPurgeModule(mod, priority);
172                 DoPurgeModule(mod, normal);
173         }
174         
175 private:
176         void DoPurgeModule(Module* mod, ReqDeque& q)
177         {
178                 for(ReqDeque::iterator iter = q.begin(); iter != q.end(); iter++)
179                 {
180                         if(iter->GetSource() == mod)
181                         {
182                                 if(iter->id == front().id)
183                                 {
184                                         /* It's the currently active query.. :x */
185                                         iter->SetSource(NULL);
186                                 }
187                                 else
188                                 {
189                                         /* It hasn't been executed yet..just remove it */
190                                         iter = q.erase(iter);
191                                 }
192                         }
193                 }
194         }
195 };
196
197 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
198  * All SQL providers must create their own subclass and define it's methods using that
199  * database library's data retriveal functions. The aim is to avoid a slow and inefficient process
200  * of converting all data to a common format before it reaches the result structure. This way
201  * data is passes to the module nearly as directly as if it was using the API directly itself.
202  */
203
204 class PgSQLresult : public SQLresult
205 {
206         PGresult* res;
207         int currentrow;
208         
209         SQLfieldList* fieldlist;
210         SQLfieldMap* fieldmap;
211 public:
212         PgSQLresult(Module* self, Module* to, unsigned long id, PGresult* result)
213         : SQLresult(self, to, id), res(result), currentrow(0), fieldlist(NULL), fieldmap(NULL)
214         {
215                 int rows = PQntuples(res);
216                 int cols = PQnfields(res);
217                 
218                 log(DEBUG, "Created new PgSQL result; %d rows, %d columns", rows, cols);
219         }
220         
221         ~PgSQLresult()
222         {
223                 PQclear(res);
224         }
225         
226         virtual int Rows()
227         {
228                 return PQntuples(res);
229         }
230         
231         virtual int Cols()
232         {
233                 return PQnfields(res);
234         }
235         
236         virtual std::string ColName(int column)
237         {
238                 char* name = PQfname(res, column);
239                 
240                 return (name) ? name : "";
241         }
242         
243         virtual int ColNum(const std::string &column)
244         {
245                 int n = PQfnumber(res, column.c_str());
246                 
247                 if(n == -1)
248                 {
249                         throw SQLbadColName();
250                 }
251                 else
252                 {
253                         return n;
254                 }
255         }
256         
257         virtual SQLfield GetValue(int row, int column)
258         {
259                 char* v = PQgetvalue(res, row, column);
260                 
261                 if(v)
262                 {
263                         return SQLfield(std::string(v, PQgetlength(res, row, column)), PQgetisnull(res, row, column));
264                 }
265                 else
266                 {
267                         log(DEBUG, "PQgetvalue returned a null pointer..nobody wants to tell us what this means");
268                         throw SQLbadColName();
269                 }
270         }
271         
272         virtual SQLfieldList& GetRow()
273         {
274                 /* In an effort to reduce overhead we don't actually allocate the list
275                  * until the first time it's needed...so...
276                  */
277                 if(fieldlist)
278                 {
279                         fieldlist->clear();
280                 }
281                 else
282                 {
283                         fieldlist = new SQLfieldList;
284                 }
285                 
286                 if(currentrow < PQntuples(res))
287                 {
288                         int cols = PQnfields(res);
289                         
290                         for(int i = 0; i < cols; i++)
291                         {
292                                 fieldlist->push_back(GetValue(currentrow, i));
293                         }
294                         
295                         currentrow++;
296                 }
297                 
298                 return *fieldlist;
299         }
300         
301         virtual SQLfieldMap& GetRowMap()
302         {
303                 /* In an effort to reduce overhead we don't actually allocate the map
304                  * until the first time it's needed...so...
305                  */
306                 if(fieldmap)
307                 {
308                         fieldmap->clear();
309                 }
310                 else
311                 {
312                         fieldmap = new SQLfieldMap;
313                 }
314                 
315                 if(currentrow < PQntuples(res))
316                 {
317                         int cols = PQnfields(res);
318                         
319                         for(int i = 0; i < cols; i++)
320                         {
321                                 fieldmap->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
322                         }
323                         
324                         currentrow++;
325                 }
326                 
327                 return *fieldmap;
328         }
329         
330         virtual SQLfieldList* GetRowPtr()
331         {
332                 SQLfieldList* fl = new SQLfieldList;
333                 
334                 if(currentrow < PQntuples(res))
335                 {
336                         int cols = PQnfields(res);
337                         
338                         for(int i = 0; i < cols; i++)
339                         {
340                                 fl->push_back(GetValue(currentrow, i));
341                         }
342                         
343                         currentrow++;
344                 }
345                 
346                 return fl;
347         }
348         
349         virtual SQLfieldMap* GetRowMapPtr()
350         {
351                 SQLfieldMap* fm = new SQLfieldMap;
352                 
353                 if(currentrow < PQntuples(res))
354                 {
355                         int cols = PQnfields(res);
356                         
357                         for(int i = 0; i < cols; i++)
358                         {
359                                 fm->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
360                         }
361                         
362                         currentrow++;
363                 }
364                 
365                 return fm;
366         }
367         
368         virtual void Free(SQLfieldMap* fm)
369         {
370                 DELETE(fm);
371         }
372         
373         virtual void Free(SQLfieldList* fl)
374         {
375                 DELETE(fl);
376         }
377 };
378
379 /** SQLConn represents one SQL session.
380  * Each session has its own persistent connection to the database.
381  * This is a subclass of InspSocket so it can easily recieve read/write events from the core socket
382  * engine, unlike the original MySQL module this module does not block. Ever. It gets a mild stabbing
383  * if it dares to.
384  */
385
386 class SQLConn : public InspSocket
387 {
388 private:
389         ModulePgSQL* us;                /* Pointer to the SQL provider itself */
390         Server* Srv;                    /* Server* for..uhm..something, maybe */
391         std::string     dbhost; /* Database server hostname */
392         unsigned int    dbport; /* Database server port */
393         std::string     dbname; /* Database name */
394         std::string     dbuser; /* Database username */
395         std::string     dbpass; /* Database password */
396         bool                    ssl;    /* If we should require SSL */
397         PGconn*                 sql;    /* PgSQL database connection handle */
398         SQLstatus               status; /* PgSQL database connection status */
399         bool                    qinprog;/* If there is currently a query in progress */
400         QueryQueue              queue;  /* Queue of queries waiting to be executed on this connection */
401
402 public:
403
404         /* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */
405
406         SQLConn(ModulePgSQL* self, Server* srv, const std::string &h, unsigned int p, const std::string &d, const std::string &u, const std::string &pwd, bool s);
407
408         ~SQLConn();
409
410         bool DoResolve();
411
412         bool DoConnect();
413
414         virtual void Close();
415         
416         bool DoPoll();
417         
418         bool DoConnectedPoll();
419         
420         void ShowStatus();      
421         
422         virtual bool OnDataReady();
423
424         virtual bool OnWriteReady();
425         
426         virtual bool OnConnected();
427         
428         bool DoEvent();
429         
430         std::string MkInfoStr();
431         
432         const char* StatusStr();
433         
434         SQLerror DoQuery(SQLrequest &req);
435         
436         SQLerror Query(const SQLrequest &req);
437         
438         void OnUnloadModule(Module* mod);
439 };
440
441 class ModulePgSQL : public Module
442 {
443 private:
444         Server* Srv;
445         ConnMap connections;
446         unsigned long currid;
447         char* sqlsuccess;
448
449 public:
450         ModulePgSQL(Server* Me)
451         : Module::Module(Me), Srv(Me), currid(0)
452         {
453                 log(DEBUG, "%s 'SQL' feature", Srv->PublishFeature("SQL", this) ? "Published" : "Couldn't publish");
454                 log(DEBUG, "%s 'PgSQL' feature", Srv->PublishFeature("PgSQL", this) ? "Published" : "Couldn't publish");
455                 
456                 sqlsuccess = new char[strlen(SQLSUCCESS)+1];
457                 
458                 strcpy(sqlsuccess, SQLSUCCESS);
459
460                 OnRehash("");
461         }
462
463         void Implements(char* List)
464         {
465                 List[I_OnUnloadModule] = List[I_OnRequest] = List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1;
466         }
467
468         virtual void OnRehash(const std::string &parameter)
469         {
470                 ConfigReader conf;
471                 
472                 /* Delete all the SQLConn objects in the connection lists,
473                  * this will call their destructors where they can handle
474                  * closing connections and such.
475                  */
476                 for(ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
477                 {
478                         DELETE(iter->second);
479                 }
480                 
481                 /* Empty out our list of connections */
482                 connections.clear();
483
484                 for(int i = 0; i < conf.Enumerate("database"); i++)
485                 {
486                         std::string id;
487                         SQLConn* newconn;
488                         
489                         id = conf.ReadValue("database", "id", i);
490                         newconn = new SQLConn(this, Srv,
491                                                                                 conf.ReadValue("database", "hostname", i),
492                                                                                 conf.ReadInteger("database", "port", i, true),
493                                                                                 conf.ReadValue("database", "name", i),
494                                                                                 conf.ReadValue("database", "username", i),
495                                                                                 conf.ReadValue("database", "password", i),
496                                                                                 conf.ReadFlag("database", "ssl", i));
497                         
498                         connections.insert(std::make_pair(id, newconn));
499                 }       
500         }
501         
502         virtual char* OnRequest(Request* request)
503         {
504                 if(strcmp(SQLREQID, request->GetData()) == 0)
505                 {
506                         SQLrequest* req = (SQLrequest*)request;
507                         ConnMap::iterator iter;
508                 
509                         log(DEBUG, "Got query: '%s' with %d replacement parameters on id '%s'", req->query.q.c_str(), req->query.p.size(), req->dbid.c_str());
510
511                         if((iter = connections.find(req->dbid)) != connections.end())
512                         {
513                                 /* Execute query */
514                                 req->id = NewID();
515                                 req->error = iter->second->Query(*req);
516                                 
517                                 return (req->error.Id() == NO_ERROR) ? sqlsuccess : NULL;
518                         }
519                         else
520                         {
521                                 req->error.Id(BAD_DBID);
522                                 return NULL;
523                         }
524                 }
525
526                 log(DEBUG, "Got unsupported API version string: %s", request->GetData());
527                 
528                 return NULL;
529         }
530         
531         virtual void OnUnloadModule(Module* mod, const std::string&     name)
532         {
533                 /* When a module unloads we have to check all the pending queries for all our connections
534                  * and set the Module* specifying where the query came from to NULL. If the query has already
535                  * been dispatched then when it is processed it will be dropped if the pointer is NULL.
536                  *
537                  * If the queries we find are not already being executed then we can simply remove them immediately.
538                  */
539                 for(ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
540                 {
541                         
542                 }
543         }
544
545         unsigned long NewID()
546         {
547                 if (currid+1 == 0)
548                         currid++;
549                 
550                 return ++currid;
551         }
552                 
553         virtual Version GetVersion()
554         {
555                 return Version(1, 0, 0, 0, VF_VENDOR|VF_SERVICEPROVIDER);
556         }
557         
558         virtual ~ModulePgSQL()
559         {
560                 DELETE(sqlsuccess);
561         }       
562 };
563
564 SQLConn::SQLConn(ModulePgSQL* self, Server* srv, const std::string &h, unsigned int p, const std::string &d, const std::string &u, const std::string &pwd, bool s)
565 : InspSocket::InspSocket(), us(self), Srv(srv), dbhost(h), dbport(p), dbname(d), dbuser(u), dbpass(pwd), ssl(s), sql(NULL), status(CWRITE), qinprog(false)
566 {
567         log(DEBUG, "Creating new PgSQL connection to database %s on %s:%u (%s/%s)", dbname.c_str(), dbhost.c_str(), dbport, dbuser.c_str(), dbpass.c_str());
568
569         /* Some of this could be reviewed, unsure if I need to fill 'host' etc...
570          * just copied this over from the InspSocket constructor.
571          */
572         strlcpy(this->host, dbhost.c_str(), MAXBUF);
573         this->port = dbport;
574         
575         this->ClosePending = false;
576         
577         if(!inet_aton(this->host, &this->addy))
578         {
579                 /* Its not an ip, spawn the resolver.
580                  * PgSQL doesn't do nonblocking DNS 
581                  * lookups, so we do it for it.
582                  */
583                 
584                 log(DEBUG,"Attempting to resolve %s", this->host);
585                 
586                 this->dns.SetNS(Srv->GetConfig()->DNSServer);
587                 this->dns.ForwardLookupWithFD(this->host, fd);
588                 
589                 this->state = I_RESOLVING;
590                 socket_ref[this->fd] = this;
591                 
592                 return;
593         }
594         else
595         {
596                 log(DEBUG,"No need to resolve %s", this->host);
597                 strlcpy(this->IP, this->host, MAXBUF);
598                 
599                 if(!this->DoConnect())
600                 {
601                         throw ModuleException("Connect failed");
602                 }
603         }
604 }
605
606 SQLConn::~SQLConn()
607 {
608         Close();
609 }
610
611 bool SQLConn::DoResolve()
612 {       
613         log(DEBUG, "Checking for DNS lookup result");
614         
615         if(this->dns.HasResult())
616         {
617                 std::string res_ip = dns.GetResultIP();
618                 
619                 if(res_ip.length())
620                 {
621                         log(DEBUG, "Got result: %s", res_ip.c_str());
622                         
623                         strlcpy(this->IP, res_ip.c_str(), MAXBUF);
624                         dbhost = res_ip;
625                         
626                         socket_ref[this->fd] = NULL;
627                         
628                         return this->DoConnect();
629                 }
630                 else
631                 {
632                         log(DEBUG, "DNS lookup failed, dying horribly");
633                         Close();
634                         return false;
635                 }
636         }
637         else
638         {
639                 log(DEBUG, "No result for lookup yet!");
640                 return true;
641         }
642 }
643
644 bool SQLConn::DoConnect()
645 {
646         log(DEBUG, "SQLConn::DoConnect()");
647         
648         if(!(sql = PQconnectStart(MkInfoStr().c_str())))
649         {
650                 log(DEBUG, "Couldn't allocate PGconn structure, aborting: %s", PQerrorMessage(sql));
651                 Close();
652                 return false;
653         }
654         
655         if(PQstatus(sql) == CONNECTION_BAD)
656         {
657                 log(DEBUG, "PQconnectStart failed: %s", PQerrorMessage(sql));
658                 Close();
659                 return false;
660         }
661         
662         ShowStatus();
663         
664         if(PQsetnonblocking(sql, 1) == -1)
665         {
666                 log(DEBUG, "Couldn't set connection nonblocking: %s", PQerrorMessage(sql));
667                 Close();
668                 return false;
669         }
670         
671         /* OK, we've initalised the connection, now to get it hooked into the socket engine
672          * and then start polling it.
673          */
674         
675         log(DEBUG, "Old DNS socket: %d", this->fd);
676         this->fd = PQsocket(sql);
677         log(DEBUG, "New SQL socket: %d", this->fd);
678         
679         if(this->fd <= -1)
680         {
681                 log(DEBUG, "PQsocket says we have an invalid FD: %d", this->fd);
682                 Close();
683                 return false;
684         }
685         
686         this->state = I_CONNECTING;
687         ServerInstance->SE->AddFd(this->fd,false,X_ESTAB_MODULE);
688         socket_ref[this->fd] = this;
689         
690         /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
691         
692         return DoPoll();
693 }
694
695 void SQLConn::Close()
696 {
697         log(DEBUG,"SQLConn::Close");
698         
699         if(this->fd > 01)
700                 socket_ref[this->fd] = NULL;
701         this->fd = -1;
702         this->state = I_ERROR;
703         this->OnError(I_ERR_SOCKET);
704         this->ClosePending = true;
705         
706         if(sql)
707         {
708                 PQfinish(sql);
709                 sql = NULL;
710         }
711         
712         return;
713 }
714
715 bool SQLConn::DoPoll()
716 {
717         switch(PQconnectPoll(sql))
718         {
719                 case PGRES_POLLING_WRITING:
720                         log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING");
721                         WantWrite();
722                         status = CWRITE;
723                         return DoPoll();
724                 case PGRES_POLLING_READING:
725                         log(DEBUG, "PGconnectPoll: PGRES_POLLING_READING");
726                         status = CREAD;
727                         break;
728                 case PGRES_POLLING_FAILED:
729                         log(DEBUG, "PGconnectPoll: PGRES_POLLING_FAILED: %s", PQerrorMessage(sql));
730                         return false;
731                 case PGRES_POLLING_OK:
732                         log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK");
733                         status = WWRITE;
734                         return DoConnectedPoll();
735                 default:
736                         log(DEBUG, "PGconnectPoll: wtf?");
737                         break;
738         }
739         
740         return true;
741 }
742
743 bool SQLConn::DoConnectedPoll()
744 {
745         if(!qinprog && queue.totalsize())
746         {
747                 /* There's no query currently in progress, and there's queries in the queue. */
748                 SQLrequest& query = queue.front();
749                 DoQuery(query);
750         }
751         
752         if(PQconsumeInput(sql))
753         {
754                 log(DEBUG, "PQconsumeInput succeeded");
755                         
756                 if(PQisBusy(sql))
757                 {
758                         log(DEBUG, "Still busy processing command though");
759                 }
760                 else if(qinprog)
761                 {
762                         log(DEBUG, "Looks like we have a result to process!");
763                         
764                         /* Grab the request we're processing */
765                         SQLrequest& query = queue.front();
766                         
767                         log(DEBUG, "ID is %lu", query.id);
768                         
769                         /* Get a pointer to the module we're about to return the result to */
770                         Module* to = query.GetSource();
771                         
772                         /* Fetch the result.. */
773                         PGresult* result = PQgetResult(sql);
774                         
775                         /* PgSQL would allow a query string to be sent which has multiple
776                          * queries in it, this isn't portable across database backends and
777                          * we don't want modules doing it. But just in case we make sure we
778                          * drain any results there are and just use the last one.
779                          * If the module devs are behaving there will only be one result.
780                          */
781                         while (PGresult* temp = PQgetResult(sql))
782                         {
783                                 PQclear(result);
784                                 result = temp;
785                         }
786                         
787                         if(to)
788                         {
789                                 /* ..and the result */
790                                 PgSQLresult reply(us, to, query.id, result);
791                                 
792                                 log(DEBUG, "Got result, status code: %s; error message: %s", PQresStatus(PQresultStatus(result)), PQresultErrorMessage(result));        
793                                 
794                                 switch(PQresultStatus(result))
795                                 {
796                                         case PGRES_EMPTY_QUERY:
797                                         case PGRES_BAD_RESPONSE:
798                                         case PGRES_FATAL_ERROR:
799                                                 reply.error.Id(QREPLY_FAIL);
800                                                 reply.error.Str(PQresultErrorMessage(result));
801                                         default:;
802                                                 /* No action, other values are not errors */
803                                 }
804                                 
805                                 reply.Send();
806                                 
807                                 /* PgSQLresult's destructor will free the PGresult */
808                         }
809                         else
810                         {
811                                 /* If the client module is unloaded partway through a query then the provider will set
812                                  * the pointer to NULL. We cannot just cancel the query as the result will still come
813                                  * through at some point...and it could get messy if we play with invalid pointers...
814                                  */
815                                 log(DEBUG, "Looks like we're handling a zombie query from a module which unloaded before it got a result..fun. ID: %lu", query.id);
816                                 PQclear(result);
817                         }
818                         
819                         qinprog = false;
820                         queue.pop();                            
821                         DoConnectedPoll();
822                 }
823                 
824                 return true;
825         }
826         
827         log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql));
828         return false;
829 }
830
831 void SQLConn::ShowStatus()
832 {
833         switch(PQstatus(sql))
834         {
835                 case CONNECTION_STARTED:
836                         log(DEBUG, "PQstatus: CONNECTION_STARTED: Waiting for connection to be made.");
837                         break;
838
839                 case CONNECTION_MADE:
840                         log(DEBUG, "PQstatus: CONNECTION_MADE: Connection OK; waiting to send.");
841                         break;
842                 
843                 case CONNECTION_AWAITING_RESPONSE:
844                         log(DEBUG, "PQstatus: CONNECTION_AWAITING_RESPONSE: Waiting for a response from the server.");
845                         break;
846                 
847                 case CONNECTION_AUTH_OK:
848                         log(DEBUG, "PQstatus: CONNECTION_AUTH_OK: Received authentication; waiting for backend start-up to finish.");
849                         break;
850                 
851                 case CONNECTION_SSL_STARTUP:
852                         log(DEBUG, "PQstatus: CONNECTION_SSL_STARTUP: Negotiating SSL encryption.");
853                         break;
854                 
855                 case CONNECTION_SETENV:
856                         log(DEBUG, "PQstatus: CONNECTION_SETENV: Negotiating environment-driven parameter settings.");
857                         break;
858                 
859                 default:
860                         log(DEBUG, "PQstatus: ???");
861         }
862 }
863
864 bool SQLConn::OnDataReady()
865 {
866         /* Always return true here, false would close the socket - we need to do that ourselves with the pgsql API */
867         log(DEBUG, "OnDataReady(): status = %s", StatusStr());
868         
869         return DoEvent();
870 }
871
872 bool SQLConn::OnWriteReady()
873 {
874         /* Always return true here, false would close the socket - we need to do that ourselves with the pgsql API */
875         log(DEBUG, "OnWriteReady(): status = %s", StatusStr());
876         
877         return DoEvent();
878 }
879
880 bool SQLConn::OnConnected()
881 {
882         log(DEBUG, "OnConnected(): status = %s", StatusStr());
883         
884         return DoEvent();
885 }
886
887 bool SQLConn::DoEvent()
888 {
889         bool ret;
890         
891         if((status == CREAD) || (status == CWRITE))
892         {
893                 ret = DoPoll();
894         }
895         else
896         {
897                 ret = DoConnectedPoll();
898         }
899         
900         switch(PQflush(sql))
901         {
902                 case -1:
903                         log(DEBUG, "Error flushing write queue: %s", PQerrorMessage(sql));
904                         break;
905                 case 0:
906                         log(DEBUG, "Successfully flushed write queue (or there was nothing to write)");
907                         break;
908                 case 1:
909                         log(DEBUG, "Not all of the write queue written, triggering write event so we can have another go");
910                         WantWrite();
911                         break;
912         }
913
914         return ret;
915 }
916
917 std::string SQLConn::MkInfoStr()
918 {                       
919         std::ostringstream conninfo("connect_timeout = '2'");
920         
921         if(dbhost.length())
922                 conninfo << " hostaddr = '" << dbhost << "'";
923         
924         if(dbport)
925                 conninfo << " port = '" << dbport << "'";
926         
927         if(dbname.length())
928                 conninfo << " dbname = '" << dbname << "'";
929         
930         if(dbuser.length())
931                 conninfo << " user = '" << dbuser << "'";
932         
933         if(dbpass.length())
934                 conninfo << " password = '" << dbpass << "'";
935         
936         if(ssl)
937                 conninfo << " sslmode = 'require'";
938         
939         return conninfo.str();
940 }
941
942 const char* SQLConn::StatusStr()
943 {
944         if(status == CREAD) return "CREAD";
945         if(status == CWRITE) return "CWRITE";
946         if(status == WREAD) return "WREAD";
947         if(status == WWRITE) return "WWRITE";
948         return "Err...what, erm..BUG!";
949 }
950
951 SQLerror SQLConn::DoQuery(SQLrequest &req)
952 {
953         if((status == WREAD) || (status == WWRITE))
954         {
955                 if(!qinprog)
956                 {
957                         /* Parse the command string and dispatch it */
958                         
959                         /* Pointer to the buffer we screw around with substitution in */
960                         char* query;
961                         /* Pointer to the current end of query, where we append new stuff */
962                         char* queryend;
963                         /* Total length of the unescaped parameters */
964                         unsigned int paramlen;
965                         
966                         paramlen = 0;
967                         
968                         for(ParamL::iterator i = req.query.p.begin(); i != req.query.p.end(); i++)
969                         {
970                                 paramlen += i->size();
971                         }
972                         
973                         /* To avoid a lot of allocations, allocate enough memory for the biggest the escaped query could possibly be.
974                          * sizeofquery + (totalparamlength*2) + 1
975                          * 
976                          * The +1 is for null-terminating the string for PQsendQuery()
977                          */
978                         
979                         query = new char[req.query.q.length() + (paramlen*2)];
980                         queryend = query;
981                         
982                         /* Okay, now we have a buffer large enough we need to start copying the query into it and escaping and substituting
983                          * the parameters into it...
984                          */
985                         
986                         for(unsigned int i = 0; i < req.query.q.length(); i++)
987                         {
988                                 if(req.query.q[i] == '?')
989                                 {
990                                         /* We found a place to substitute..what fun.
991                                          * Use the PgSQL calls to escape and write the
992                                          * escaped string onto the end of our query buffer,
993                                          * then we "just" need to make sure queryend is
994                                          * pointing at the right place.
995                                          */
996                                         
997                                         if(req.query.p.size())
998                                         {
999                                                 int error = 0;
1000                                                 size_t len = 0;
1001
1002 #ifdef PGSQL_HAS_ESCAPECONN
1003                                                 len = PQescapeStringConn(sql, queryend, req.query.p.front().c_str(), req.query.p.front().length(), &error);
1004 #else
1005                                                 len = PQescapeStringConn(queryend, req.query.p.front().c_str(), req.query.p.front().length());
1006                                                 error = 0;
1007 #endif
1008                                                 
1009                                                 if(error)
1010                                                 {
1011                                                         log(DEBUG, "Apparently PQescapeStringConn() failed somehow...don't know how or what to do...");
1012                                                 }
1013                                                 
1014                                                 log(DEBUG, "Appended %d bytes of escaped string onto the query", len);
1015                                                 
1016                                                 /* Incremenet queryend to the end of the newly escaped parameter */
1017                                                 queryend += len;
1018                                                 
1019                                                 /* Remove the parameter we just substituted in */
1020                                                 req.query.p.pop_front();
1021                                         }
1022                                         else
1023                                         {
1024                                                 log(DEBUG, "Found a substitution location but no parameter to substitute :|");
1025                                                 break;
1026                                         }
1027                                 }
1028                                 else
1029                                 {
1030                                         *queryend = req.query.q[i];
1031                                         queryend++;
1032                                 }
1033                         }
1034                         
1035                         /* Null-terminate the query */
1036                         *queryend = 0;
1037         
1038                         log(DEBUG, "Attempting to dispatch query: %s", query);
1039                         
1040                         req.query.q = query;
1041
1042                         if(PQsendQuery(sql, query))
1043                         {
1044                                 log(DEBUG, "Dispatched query successfully");
1045                                 qinprog = true;
1046                                 DELETE(query);
1047                                 return SQLerror();
1048                         }
1049                         else
1050                         {
1051                                 log(DEBUG, "Failed to dispatch query: %s", PQerrorMessage(sql));
1052                                 DELETE(query);
1053                                 return SQLerror(QSEND_FAIL, PQerrorMessage(sql));
1054                         }
1055                 }
1056         }
1057
1058         log(DEBUG, "Can't query until connection is complete");
1059         return SQLerror(BAD_CONN, "Can't query until connection is complete");
1060 }
1061
1062 SQLerror SQLConn::Query(const SQLrequest &req)
1063 {
1064         queue.push(req);
1065         
1066         if(!qinprog && queue.totalsize())
1067         {
1068                 /* There's no query currently in progress, and there's queries in the queue. */
1069                 SQLrequest& query = queue.front();
1070                 return DoQuery(query);
1071         }
1072         else
1073         {
1074                 return SQLerror();
1075         }
1076 }
1077
1078 void SQLConn::OnUnloadModule(Module* mod)
1079 {
1080         queue.PurgeModule(mod);
1081 }
1082
1083 class ModulePgSQLFactory : public ModuleFactory
1084 {
1085  public:
1086         ModulePgSQLFactory()
1087         {
1088         }
1089         
1090         ~ModulePgSQLFactory()
1091         {
1092         }
1093         
1094         virtual Module * CreateModule(Server* Me)
1095         {
1096                 return new ModulePgSQL(Me);
1097         }
1098 };
1099
1100
1101 extern "C" void * init_module( void )
1102 {
1103         return new ModulePgSQLFactory;
1104 }