]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/modules/extra/m_ziplink.cpp
Remove InspIRCd* parameters and fields
[user/henk/code/inspircd.git] / src / modules / extra / m_ziplink.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2009 InspIRCd Development Team
6  * See: http://wiki.inspircd.org/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include <zlib.h>
16 #include "transport.h"
17 #include <iostream>
18
19 /* $ModDesc: Provides zlib link support for servers */
20 /* $LinkerFlags: -lz */
21 /* $ModDep: transport.h */
22
23 /*
24  * ZLIB_BEST_COMPRESSION (9) is used for all sending of data with
25  * a flush after each chunk. A frame may contain multiple lines
26  * and should be treated as raw binary data.
27  */
28
29 /* Status of a connection */
30 enum izip_status { IZIP_CLOSED = 0, IZIP_OPEN };
31
32 /** Represents an zipped connections extra data
33  */
34 class izip_session : public classbase
35 {
36  public:
37         z_stream c_stream;      /* compression stream */
38         z_stream d_stream;      /* uncompress stream */
39         izip_status status;     /* Connection status */
40         std::string outbuf;     /* Holds output buffer (compressed) */
41         std::string inbuf;      /* Holds input buffer (compressed) */
42 };
43
44 class ModuleZLib : public Module
45 {
46         izip_session* sessions;
47
48         /* Used for stats z extensions */
49         float total_out_compressed;
50         float total_in_compressed;
51         float total_out_uncompressed;
52         float total_in_uncompressed;
53
54         /* Used for reading data from the wire and compressing data to. */
55         char *net_buffer;
56         unsigned int net_buffer_size;
57  public:
58
59         ModuleZLib()
60                         {
61                 ServerInstance->Modules->PublishInterface("BufferedSocketHook", this);
62
63                 sessions = new izip_session[ServerInstance->SE->GetMaxFds()];
64                 for (int i = 0; i < ServerInstance->SE->GetMaxFds(); i++)
65                         sessions[i].status = IZIP_CLOSED;
66
67                 total_out_compressed = total_in_compressed = 0;
68                 total_out_uncompressed = total_in_uncompressed = 0;
69                 Implementation eventlist[] = { I_OnStats, I_OnRequest };
70                 ServerInstance->Modules->Attach(eventlist, this, 2);
71
72                 // Allocate a buffer which is used for reading and writing data
73                 net_buffer_size = ServerInstance->Config->NetBufferSize;
74                 net_buffer = new char[net_buffer_size];
75         }
76
77         ~ModuleZLib()
78         {
79                 ServerInstance->Modules->UnpublishInterface("BufferedSocketHook", this);
80                 delete[] sessions;
81                 delete[] net_buffer;
82         }
83
84         Version GetVersion()
85         {
86                 return Version("Provides zlib link support for servers", VF_VENDOR, API_VERSION);
87         }
88
89
90         /* Handle BufferedSocketHook API requests */
91         const char* OnRequest(Request* request)
92         {
93                 ISHRequest* ISR = (ISHRequest*)request;
94                 if (strcmp("IS_NAME", request->GetId()) == 0)
95                 {
96                         /* Return name */
97                         return "zip";
98                 }
99                 else if (strcmp("IS_HOOK", request->GetId()) == 0)
100                 {
101                         ISR->Sock->AddIOHook(this);
102                         return "OK";
103                 }
104                 else if (strcmp("IS_UNHOOK", request->GetId()) == 0)
105                 {
106                         ISR->Sock->DelIOHook();
107                         return "OK";
108                 }
109                 else if (strcmp("IS_HSDONE", request->GetId()) == 0)
110                 {
111                         /* Check for completion of handshake
112                          * (actually, this module doesnt handshake)
113                          */
114                         return "OK";
115                 }
116                 else if (strcmp("IS_ATTACH", request->GetId()) == 0)
117                 {
118                         /* Attach certificate data to the inspsocket
119                          * (this module doesnt do that, either)
120                          */
121                         return NULL;
122                 }
123                 return NULL;
124         }
125
126         /* Handle stats z (misc stats) */
127         ModResult OnStats(char symbol, User* user, string_list &results)
128         {
129                 if (symbol == 'z')
130                 {
131                         std::string sn = ServerInstance->Config->ServerName;
132
133                         /* Yeah yeah, i know, floats are ew.
134                          * We used them here because we'd be casting to float anyway to do this maths,
135                          * and also only floating point numbers can deal with the pretty large numbers
136                          * involved in the total throughput of a server over a large period of time.
137                          * (we dont count 64 bit ints because not all systems have 64 bit ints, and floats
138                          * can still hold more.
139                          */
140                         float outbound_r = (total_out_compressed / (total_out_uncompressed + 0.001)) * 100;
141                         float inbound_r = (total_in_compressed / (total_in_uncompressed + 0.001)) * 100;
142
143                         float total_compressed = total_in_compressed + total_out_compressed;
144                         float total_uncompressed = total_in_uncompressed + total_out_uncompressed;
145
146                         float total_r = (total_compressed / (total_uncompressed + 0.001)) * 100;
147
148                         char outbound_ratio[MAXBUF], inbound_ratio[MAXBUF], combined_ratio[MAXBUF];
149
150                         sprintf(outbound_ratio, "%3.2f%%", outbound_r);
151                         sprintf(inbound_ratio, "%3.2f%%", inbound_r);
152                         sprintf(combined_ratio, "%3.2f%%", total_r);
153
154                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_compressed   = "+ConvToStr(total_out_compressed));
155                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_compressed    = "+ConvToStr(total_in_compressed));
156                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_uncompressed = "+ConvToStr(total_out_uncompressed));
157                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_uncompressed  = "+ConvToStr(total_in_uncompressed));
158                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS percentage_of_original_outbound_traffic        = "+outbound_ratio);
159                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS percentage_of_orignal_inbound_traffic         = "+inbound_ratio);
160                         results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS total_size_of_original_traffic        = "+combined_ratio);
161                         return MOD_RES_PASSTHRU;
162                 }
163
164                 return MOD_RES_PASSTHRU;
165         }
166
167         void OnStreamSocketConnect(StreamSocket* user)
168         {
169                 OnStreamSocketAccept(user, 0, 0);
170         }
171
172         void OnRawSocketAccept(StreamSocket* user, irc::sockets::sockaddrs*, irc::sockets::sockaddrs*)
173         {
174                 int fd = user->GetFd();
175
176                 izip_session* session = &sessions[fd];
177
178                 /* Just in case... */
179                 session->outbuf.clear();
180
181                 session->c_stream.zalloc = (alloc_func)0;
182                 session->c_stream.zfree = (free_func)0;
183                 session->c_stream.opaque = (voidpf)0;
184
185                 session->d_stream.zalloc = (alloc_func)0;
186                 session->d_stream.zfree = (free_func)0;
187                 session->d_stream.opaque = (voidpf)0;
188
189                 /* If we cant call this, well, we're boned. */
190                 if (inflateInit(&session->d_stream) != Z_OK)
191                 {
192                         session->status = IZIP_CLOSED;
193                         return;
194                 }
195
196                 /* Same here */
197                 if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK)
198                 {
199                         inflateEnd(&session->d_stream);
200                         session->status = IZIP_CLOSED;
201                         return;
202                 }
203
204                 /* Just in case, do this last */
205                 session->status = IZIP_OPEN;
206         }
207
208         void OnStreamSocketClose(StreamSocket* user)
209         {
210                 int fd = user->GetFd();
211                 CloseSession(&sessions[fd]);
212         }
213
214         int OnStreamSocketRead(StreamSocket* user, std::string& recvq)
215         {
216                 int fd = user->GetFd();
217                 /* Find the sockets session */
218                 izip_session* session = &sessions[fd];
219
220                 if (session->status == IZIP_CLOSED)
221                         return -1;
222
223                 if (session->inbuf.empty())
224                 {
225                         /* Read read_buffer_size bytes at a time to the buffer (usually 2.5k) */
226                         int readresult = read(fd, net_buffer, net_buffer_size);
227
228                         if (readresult < 0)
229                         {
230                                 if (errno == EINTR || errno == EAGAIN)
231                                         return 0;
232                         }
233                         if (readresult <= 0)
234                                 return -1;
235
236                         total_in_compressed += readresult;
237
238                         /* Copy the compressed data into our input buffer */
239                         session->inbuf.append(net_buffer, readresult);
240                 }
241
242                 size_t in_len = session->inbuf.length();
243                 char* buffer = ServerInstance->GetReadBuffer();
244                 int count = ServerInstance->Config->NetBufferSize;
245
246                 /* Prepare decompression */
247                 session->d_stream.next_in = (Bytef *)session->inbuf.c_str();
248                 session->d_stream.avail_in = in_len;
249
250                 session->d_stream.next_out = (Bytef*)buffer;
251                 /* Last byte is reserved for NULL terminating that beast */
252                 session->d_stream.avail_out = count - 1;
253
254                 /* Z_SYNC_FLUSH: Do as much as possible */
255                 int ret = inflate(&session->d_stream, Z_SYNC_FLUSH);
256                 /* TODO CloseStream() in here at random places */
257                 switch (ret)
258                 {
259                         case Z_NEED_DICT:
260                         case Z_STREAM_ERROR:
261                                 /* This is one of the 'not supposed to happen' things.
262                                  * Memory corruption, anyone?
263                                  */
264                                 Error(session, "General Error. This is not supposed to happen :/");
265                                 break;
266                         case Z_DATA_ERROR:
267                                 Error(session, "Decompression failed, malformed data");
268                                 break;
269                         case Z_MEM_ERROR:
270                                 Error(session, "Out of memory");
271                                 break;
272                         case Z_BUF_ERROR:
273                                 /* This one is non-fatal, buffer is just full
274                                  * (can't happen here).
275                                  */
276                                 Error(session, "Internal error. This is not supposed to happen.");
277                                 break;
278                         case Z_STREAM_END:
279                                 /* This module *never* generates these :/ */
280                                 Error(session, "End-of-stream marker received");
281                                 break;
282                         case Z_OK:
283                                 break;
284                         default:
285                                 /* NO WAI! This can't happen. All errors are handled above. */
286                                 Error(session, "Unknown error");
287                                 break;
288                 }
289                 if (ret != Z_OK)
290                 {
291                         return -1;
292                 }
293
294                 /* Update the inbut buffer */
295                 unsigned int input_compressed = in_len - session->d_stream.avail_in;
296                 session->inbuf = session->inbuf.substr(input_compressed);
297
298                 /* Update counters (Old size - new size) */
299                 unsigned int uncompressed_length = (count - 1) - session->d_stream.avail_out;
300                 total_in_uncompressed += uncompressed_length;
301
302                 /* Null-terminate the buffer -- this doesnt harm binary data */
303                 recvq.append(buffer, uncompressed_length);
304                 return 1;
305         }
306
307         int OnStreamSocketWrite(StreamSocket* user, std::string& sendq)
308         {
309                 int fd = user->GetFd();
310                 izip_session* session = &sessions[fd];
311
312                 if(session->status != IZIP_OPEN)
313                         /* Seriously, wtf? */
314                         return -1;
315
316                 int ret;
317
318                 /* This loop is really only supposed to run once, but in case 'compr'
319                  * is filled up somehow we are prepared to handle this situation.
320                  */
321                 unsigned int offset = 0;
322                 do
323                 {
324                         /* Prepare compression */
325                         session->c_stream.next_in = (Bytef*)sendq.data() + offset;
326                         session->c_stream.avail_in = sendq.length() - offset;
327
328                         session->c_stream.next_out = (Bytef*)net_buffer;
329                         session->c_stream.avail_out = net_buffer_size;
330
331                         /* Compress the text */
332                         ret = deflate(&session->c_stream, Z_SYNC_FLUSH);
333                         /* TODO CloseStream() in here at random places */
334                         switch (ret)
335                         {
336                                 case Z_OK:
337                                         break;
338                                 case Z_BUF_ERROR:
339                                         /* This one is non-fatal, buffer is just full
340                                          * (can't happen here).
341                                          */
342                                         Error(session, "Internal error. This is not supposed to happen.");
343                                         break;
344                                 case Z_STREAM_ERROR:
345                                         /* This is one of the 'not supposed to happen' things.
346                                          * Memory corruption, anyone?
347                                          */
348                                         Error(session, "General Error. This is also not supposed to happen.");
349                                         break;
350                                 default:
351                                         Error(session, "Unknown error");
352                                         break;
353                         }
354
355                         if (ret != Z_OK)
356                                 return 0;
357
358                         /* Space before - space after stuff was added to this */
359                         unsigned int compressed = net_buffer_size - session->c_stream.avail_out;
360                         unsigned int uncompressed = sendq.length() - session->c_stream.avail_in;
361
362                         /* Make it skip the data which was compressed already */
363                         offset += uncompressed;
364
365                         /* Update stats */
366                         total_out_uncompressed += uncompressed;
367                         total_out_compressed += compressed;
368
369                         /* Add compressed to the output buffer */
370                         session->outbuf.append((const char*)net_buffer, compressed);
371                 } while (session->c_stream.avail_in != 0);
372
373                 /* Lets see how much we can send out */
374                 ret = write(fd, session->outbuf.data(), session->outbuf.length());
375
376                 /* Check for errors, and advance the buffer if any was sent */
377                 if (ret > 0)
378                         session->outbuf = session->outbuf.substr(ret);
379                 else if (ret < 1)
380                 {
381                         if (errno == EAGAIN)
382                                 return 0;
383                         else
384                         {
385                                 session->outbuf.clear();
386                                 return -1;
387                         }
388                 }
389
390                 return 1;
391         }
392
393         void Error(izip_session* session, const std::string &text)
394         {
395                 ServerInstance->SNO->WriteToSnoMask('l', "ziplink error: " + text);
396         }
397
398         void CloseSession(izip_session* session)
399         {
400                 if (session->status == IZIP_OPEN)
401                 {
402                         session->status = IZIP_CLOSED;
403                         session->outbuf.clear();
404                         inflateEnd(&session->d_stream);
405                         deflateEnd(&session->c_stream);
406                 }
407         }
408
409 };
410
411 MODULE_INIT(ModuleZLib)
412