]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_ziplink.cpp
Newly revamped ziplinks module, work of psychon.. resolves (a lot) of problems with...
[user/henk/code/inspircd.git] / src / modules / extra / m_ziplink.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2008 InspIRCd Development Team
6  * See: http://www.inspircd.org/wiki/index.php/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include <zlib.h>
16 #include "users.h"
17 #include "channels.h"
18 #include "modules.h"
19 #include "socket.h"
20 #include "hashcomp.h"
21 #include "transport.h"
22
23 #include <iostream>
24
25 /* $ModDesc: Provides zlib link support for servers */
26 /* $LinkerFlags: -lz */
27 /* $ModDep: transport.h */
28
29 /*
30  * ZLIB_BEST_COMPRESSION (9) is used for all sending of data with
31  * a flush after each chunk. A frame may contain multiple lines
32  * and should be treated as raw binary data.
33  */
34
35 /* Status of a connection */
36 enum izip_status { IZIP_CLOSED = 0, IZIP_OPEN };
37
38 /* Maximum transfer size per read operation */
39 const unsigned int CHUNK = 128 * 1024;
40
41 /** Represents an zipped connections extra data
42  */
43 class izip_session : public classbase
44 {
45  public:
46         z_stream c_stream;      /* compression stream */
47         z_stream d_stream;      /* uncompress stream */
48         izip_status status;     /* Connection status */
49         std::string outbuf;     /* Holds output buffer (compressed) */
50         std::string inbuf;      /* Holds input buffer (compressed) */
51 };
52
53 class ModuleZLib : public Module
54 {
55         izip_session* sessions;
56
57         /* Used for stats z extensions */
58         float total_out_compressed;
59         float total_in_compressed;
60         float total_out_uncompressed;
61         float total_in_uncompressed;
62
63         /* Used for reading data from the wire.
64          * We need a smaller buffer than CHUNK, else we could read more data
65          * from the wire than we can pass to the socket code.
66          */
67         char *read_buffer;
68         unsigned int read_buffer_size;
69  public:
70
71         ModuleZLib(InspIRCd* Me)
72                 : Module::Module(Me)
73         {
74                 ServerInstance->Modules->PublishInterface("BufferedSocketHook", this);
75
76                 sessions = new izip_session[ServerInstance->SE->GetMaxFds()];
77                 for (int i = 0; i < ServerInstance->SE->GetMaxFds(); i++)
78                         sessions[i].status = IZIP_CLOSED;
79
80                 total_out_compressed = total_in_compressed = 0;
81                 total_out_uncompressed = total_in_uncompressed = 0;
82                 Implementation eventlist[] = { I_OnRawSocketConnect, I_OnRawSocketAccept, I_OnRawSocketClose, I_OnRawSocketRead, I_OnRawSocketWrite, I_OnStats, I_OnRequest };
83                 ServerInstance->Modules->Attach(eventlist, this, 7);
84
85                 read_buffer_size = ServerInstance->Config->NetBufferSize / 4;
86                 read_buffer = new char[read_buffer_size];
87         }
88
89         virtual ~ModuleZLib()
90         {
91                 ServerInstance->Modules->UnpublishInterface("BufferedSocketHook", this);
92                 delete[] sessions;
93                 delete[] read_buffer;
94         }
95
96         virtual Version GetVersion()
97         {
98                 return Version("$Id$", VF_VENDOR, API_VERSION);
99         }
100
101
102         /* Handle BufferedSocketHook API requests */
103         virtual const char* OnRequest(Request* request)
104         {
105                 ISHRequest* ISR = (ISHRequest*)request;
106                 if (strcmp("IS_NAME", request->GetId()) == 0)
107                 {
108                         /* Return name */
109                         return "zip";
110                 }
111                 else if (strcmp("IS_HOOK", request->GetId()) == 0)
112                 {
113                         /* Attach to an inspsocket */
114                         const char* ret = "OK";
115                         try
116                         {
117                                 ret = ServerInstance->Config->AddIOHook((Module*)this, (BufferedSocket*)ISR->Sock) ? "OK" : NULL;
118                         }
119                         catch (ModuleException& e)
120                         {
121                                 return NULL;
122                         }
123                         return ret;
124                 }
125                 else if (strcmp("IS_UNHOOK", request->GetId()) == 0)
126                 {
127                         /* Detach from an inspsocket */
128                         return ServerInstance->Config->DelIOHook((BufferedSocket*)ISR->Sock) ? "OK" : NULL;
129                 }
130                 else if (strcmp("IS_HSDONE", request->GetId()) == 0)
131                 {
132                         /* Check for completion of handshake
133                          * (actually, this module doesnt handshake)
134                          */
135                         return "OK";
136                 }
137                 else if (strcmp("IS_ATTACH", request->GetId()) == 0)
138                 {
139                         /* Attach certificate data to the inspsocket
140                          * (this module doesnt do that, either)
141                          */
142                         return NULL;
143                 }
144                 return NULL;
145         }
146
147         /* Handle stats z (misc stats) */
148         virtual int OnStats(char symbol, User* user, string_list &results)
149         {
150                 if (symbol == 'z')
151                 {
152                         std::string sn = ServerInstance->Config->ServerName;
153
154                         /* Yeah yeah, i know, floats are ew.
155                          * We used them here because we'd be casting to float anyway to do this maths,
156                          * and also only floating point numbers can deal with the pretty large numbers
157                          * involved in the total throughput of a server over a large period of time.
158                          * (we dont count 64 bit ints because not all systems have 64 bit ints, and floats
159                          * can still hold more.
160                          */
161                         float outbound_r = (total_out_compressed / (total_out_uncompressed + 0.001)) * 100;
162                         float inbound_r = (total_in_compressed / (total_in_uncompressed + 0.001)) * 100;
163
164                         float total_compressed = total_in_compressed + total_out_compressed;
165                         float total_uncompressed = total_in_uncompressed + total_out_uncompressed;
166
167                         float total_r = (total_compressed / (total_uncompressed + 0.001)) * 100;
168
169                         char outbound_ratio[MAXBUF], inbound_ratio[MAXBUF], combined_ratio[MAXBUF];
170
171                         sprintf(outbound_ratio, "%3.2f%%", outbound_r);
172                         sprintf(inbound_ratio, "%3.2f%%", inbound_r);
173                         sprintf(combined_ratio, "%3.2f%%", total_r);
174
175                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_compressed   = "+ConvToStr(total_out_compressed));
176                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_compressed    = "+ConvToStr(total_in_compressed));
177                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_uncompressed = "+ConvToStr(total_out_uncompressed));
178                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_uncompressed  = "+ConvToStr(total_in_uncompressed));
179                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_ratio        = "+outbound_ratio);
180                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_ratio         = "+inbound_ratio);
181                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS combined_ratio        = "+combined_ratio);
182                         return 0;
183                 }
184
185                 return 0;
186         }
187
188         virtual void OnRawSocketConnect(int fd)
189         {
190                 if ((fd < 0) || (fd > ServerInstance->SE->GetMaxFds() - 1))
191                         return;
192
193                 izip_session* session = &sessions[fd];
194
195                 /* allocate state and buffers */
196                 session->status = IZIP_OPEN;
197
198                 /* Just in case... */
199                 session->outbuf.clear();
200
201                 session->c_stream.zalloc = (alloc_func)0;
202                 session->c_stream.zfree = (free_func)0;
203                 session->c_stream.opaque = (voidpf)0;
204
205                 session->d_stream.zalloc = (alloc_func)0;
206                 session->d_stream.zfree = (free_func)0;
207                 session->d_stream.opaque = (voidpf)0;
208
209                 /* If we cant call this, well, we're boned. */
210                 if (inflateInit(&session->d_stream) != Z_OK)
211                 {
212                         session->status = IZIP_CLOSED;
213                         return;
214                 }
215
216                 /* Same here */
217                 if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK)
218                 {
219                         inflateEnd(&session->d_stream);
220                         session->status = IZIP_CLOSED;
221                         return;
222                 }
223         }
224
225         virtual void OnRawSocketAccept(int fd, const std::string &ip, int localport)
226         {
227                 /* Nothing special needs doing here compared to connect() */
228                 OnRawSocketConnect(fd);
229         }
230
231         virtual void OnRawSocketClose(int fd)
232         {
233                 CloseSession(&sessions[fd]);
234         }
235
236         virtual int OnRawSocketRead(int fd, char* buffer, unsigned int count, int &readresult)
237         {
238                 /* Find the sockets session */
239                 izip_session* session = &sessions[fd];
240
241                 if (session->status == IZIP_CLOSED)
242                         return 0;
243
244                 /* Read read_buffer_size bytes at a time to the buffer (usually 128k) */
245                 readresult = read(fd, read_buffer, read_buffer_size);
246
247                 /* Did we get anything? */
248                 if (readresult <= 0)
249                         return 0;
250
251                 total_in_compressed += readresult;
252
253                 /* Copy the compressed data into out input buffer */
254                 session->inbuf.append(read_buffer, readresult);
255                 size_t in_len = session->inbuf.length();
256
257                 /* Prepare decompression */
258                 session->d_stream.next_in = (Bytef *)session->inbuf.c_str();
259                 session->d_stream.avail_in = in_len;
260
261                 session->d_stream.next_out = (Bytef*)buffer;
262                 /* Last byte is reserved for NULL terminating that beast */
263                 session->d_stream.avail_out = count - 1;
264
265                 /* Z_SYNC_FLUSH: Do as much as possible */
266                 int ret = inflate(&session->d_stream, Z_SYNC_FLUSH);
267                 /* TODO CloseStream() in here at random places */
268                 switch (ret)
269                 {
270                         case Z_NEED_DICT:
271                         case Z_STREAM_ERROR:
272                                 /* This is one of the 'not supposed to happen' things.
273                                  * Memory corruption, anyone?
274                                  */
275                                 Error(session, "General Error. This is not supposed to happen :/");
276                                 break;
277                         case Z_DATA_ERROR:
278                                 Error(session, "Decompression failed, malformed data");
279                                 break;
280                         case Z_MEM_ERROR:
281                                 Error(session, "Out of memory");
282                                 break;
283                         case Z_BUF_ERROR:
284                                 /* This one is non-fatal, buffer is just full
285                                  * (can't happen here).
286                                  */
287                                 Error(session, "Internal error. This is not supposed to happen.");
288                                 break;
289                         case Z_STREAM_END:
290                                 /* This module *never* generates these :/ */
291                                 Error(session, "End-of-stream marker received");
292                                 break;
293                         case Z_OK:
294                                 break;
295                         default:
296                                 /* NO WAI! This can't happen. All errors are handled above. */
297                                 Error(session, "Unknown error");
298                                 break;
299                 }
300                 if (ret != Z_OK)
301                 {
302                         readresult = 0;
303                         return 0;
304                 }
305
306                 /* Update the inbut buffer */
307                 unsigned int input_compressed = in_len - session->d_stream.avail_in;
308                 session->inbuf = session->inbuf.substr(input_compressed);
309
310                 /* Update counters (Old size - new size) */
311                 unsigned int uncompressed_length = (count - 1) - session->d_stream.avail_out;
312                 total_in_uncompressed += uncompressed_length;
313
314                 /* Null-terminate the buffer -- this doesnt harm binary data */
315                 buffer[uncompressed_length] = 0;
316
317                 /* Set the read size to the correct total size */
318                 readresult = uncompressed_length;
319
320                 return 1;
321         }
322
323         virtual int OnRawSocketWrite(int fd, const char* buffer, int count)
324         {
325                 izip_session* session = &sessions[fd];
326
327                 if (!count)     /* Nothing to do! */
328                         return 0;
329
330                 if(session->status != IZIP_OPEN)
331                         /* Seriously, wtf? */
332                         return 0;
333
334                 unsigned char compr[CHUNK];
335                 int ret;
336
337                 /* This loop is really only supposed to run once, but in case 'compr'
338                  * is filled up somehow we are prepared to handle this situation.
339                  */
340                 unsigned int offset = 0;
341                 do
342                 {
343                         /* Prepare compression */
344                         session->c_stream.next_in = (Bytef*)buffer + offset;
345                         session->c_stream.avail_in = count - offset;
346
347                         session->c_stream.next_out = compr;
348                         session->c_stream.avail_out = CHUNK;
349
350                         /* Compress the text */
351                         ret = deflate(&session->c_stream, Z_SYNC_FLUSH);
352                         /* TODO CloseStream() in here at random places */
353                         switch (ret)
354                         {
355                                 case Z_OK:
356                                         break;
357                                 case Z_BUF_ERROR:
358                                         /* This one is non-fatal, buffer is just full
359                                          * (can't happen here).
360                                          */
361                                         Error(session, "Internal error. This is not supposed to happen.");
362                                         break;
363                                 case Z_STREAM_ERROR:
364                                         /* This is one of the 'not supposed to happen' things.
365                                          * Memory corruption, anyone?
366                                          */
367                                         Error(session, "General Error. This is also not supposed to happen.");
368                                         break;
369                                 default:
370                                         Error(session, "Unknown error");
371                                         break;
372                         }
373
374                         if (ret != Z_OK)
375                                 return 0;
376
377                         /* Space before - space after stuff was added to this */
378                         unsigned int compressed = CHUNK - session->c_stream.avail_out;
379                         unsigned int uncompressed = count - session->c_stream.avail_in;
380
381                         /* Make it skip the data which was compressed already */
382                         offset += uncompressed;
383
384                         /* Update stats */
385                         total_out_uncompressed += uncompressed;
386                         total_out_compressed += compressed;
387
388                         /* Add compressed to the output buffer */
389                         session->outbuf.append((const char*)compr, compressed);
390                 } while (session->c_stream.avail_in != 0);
391
392                 /* Lets see how much we can send out */
393                 ret = write(fd, session->outbuf.data(), session->outbuf.length());
394
395                 /* Check for errors, and advance the buffer if any was sent */
396                 if (ret > 0)
397                         session->outbuf = session->outbuf.substr(ret);
398                 else if (ret < 1)
399                 {
400                         if (errno == EAGAIN)
401                                 return 0;
402                         else
403                         {
404                                 session->outbuf.clear();
405                                 /* TODO pass errors down? */
406                                 return 0;
407                         }
408                 }
409
410                 /* ALL LIES the lot of it, we havent really written
411                  * this amount, but the layer above doesnt need to know.
412                  */
413                 return count;
414         }
415
416         void Error(izip_session* session, const std::string &text)
417         {
418                 ServerInstance->SNO->WriteToSnoMask('l', "ziplink error: " + text);
419         }
420
421         void CloseSession(izip_session* session)
422         {
423                 if (session->status == IZIP_OPEN)
424                 {
425                         session->status = IZIP_CLOSED;
426                         session->outbuf.clear();
427                         inflateEnd(&session->d_stream);
428                         deflateEnd(&session->c_stream);
429                 }
430         }
431
432 };
433
434 MODULE_INIT(ModuleZLib)
435