X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;ds=inline;f=src%2Fmodules%2Fextra%2Fm_ziplink.cpp;h=976a27b5c7293bec55c4a336b428aaec1f3d989d;hb=6d03943426dcce76ba66567a9b18425a5ebb4c0c;hp=be57eea97e22d6ac35f453e90680ba61a8fa043c;hpb=d185decae97752368d5cf62311cbc0d1a52aa22c;p=user%2Fhenk%2Fcode%2Finspircd.git diff --git a/src/modules/extra/m_ziplink.cpp b/src/modules/extra/m_ziplink.cpp index be57eea97..976a27b5c 100644 --- a/src/modules/extra/m_ziplink.cpp +++ b/src/modules/extra/m_ziplink.cpp @@ -2,8 +2,8 @@ * | Inspire Internet Relay Chat Daemon | * +------------------------------------+ * - * InspIRCd: (C) 2002-2008 InspIRCd Development Team - * See: http://www.inspircd.org/wiki/index.php/Credits + * InspIRCd: (C) 2002-2009 InspIRCd Development Team + * See: http://wiki.inspircd.org/Credits * * This program is free but copyrighted software; see * the file COPYING for details. @@ -13,114 +13,21 @@ #include "inspircd.h" #include -#include "users.h" -#include "channels.h" -#include "modules.h" -#include "socket.h" -#include "hashcomp.h" #include "transport.h" +#include /* $ModDesc: Provides zlib link support for servers */ /* $LinkerFlags: -lz */ /* $ModDep: transport.h */ /* - * Compressed data is transmitted across the link in the following format: - * - * 0 1 2 3 4 ... n - * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ - * | n | Z0 -> Zn | - * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ - * - * Where: n is the size of a frame, in network byte order, 4 bytes. - * Z0 through Zn are Zlib compressed data, n bytes in length. - * - * If the module fails to read the entire frame, then it will buffer - * the portion of the last frame it received, then attempt to read - * the next part of the frame next time a write notification arrives. - * * ZLIB_BEST_COMPRESSION (9) is used for all sending of data with - * a flush after each frame. A frame may contain multiple lines + * a flush after each chunk. A frame may contain multiple lines * and should be treated as raw binary data. - * */ /* Status of a connection */ -enum izip_status { IZIP_OPEN, IZIP_CLOSED }; - -/* Maximum transfer size per read operation */ -const unsigned int CHUNK = 128 * 1024; - -/* This class manages a compressed chunk of data preceeded by - * a length count. - * - * It can handle having multiple chunks of data in the buffer - * at any time. - */ -class CountedBuffer : public classbase -{ - std::string buffer; /* Current buffer contents */ - unsigned int amount_expected; /* Amount of data expected */ - public: - CountedBuffer() - { - amount_expected = 0; - } - - /** Adds arbitrary compressed data to the buffer. - * - Binsry safe, of course. - */ - void AddData(unsigned char* data, int data_length) - { - buffer.append((const char*)data, data_length); - this->NextFrameSize(); - } - - /** Works out the size of the next compressed frame - */ - void NextFrameSize() - { - if ((!amount_expected) && (buffer.length() >= 4)) - { - /* We have enough to read an int - - * Yes, this is safe, but its ugly. Give me - * a nicer way to read 4 bytes from a binary - * stream, and push them into a 32 bit int, - * and i'll consider replacing this. - */ - amount_expected = ntohl((buffer[3] << 24) | (buffer[2] << 16) | (buffer[1] << 8) | buffer[0]); - buffer = buffer.substr(4); - } - } - - /** Gets the next frame and returns its size, or returns - * zero if there isnt one available yet. - * A frame can contain multiple plaintext lines. - * - Binary safe. - */ - int GetFrame(unsigned char* frame, int maxsize) - { - if (amount_expected) - { - /* We know how much we're expecting... - * Do we have enough yet? - */ - if (buffer.length() >= amount_expected) - { - int j = 0; - for (unsigned int i = 0; i < amount_expected; i++, j++) - frame[i] = buffer[i]; - - buffer = buffer.substr(j); - amount_expected = 0; - NextFrameSize(); - return j; - } - } - /* Not enough for a frame yet, COME AGAIN! */ - return 0; - } -}; +enum izip_status { IZIP_CLOSED = 0, IZIP_OPEN }; /** Represents an zipped connections extra data */ @@ -128,11 +35,10 @@ class izip_session : public classbase { public: z_stream c_stream; /* compression stream */ - z_stream d_stream; /* decompress stream */ + z_stream d_stream; /* uncompress stream */ izip_status status; /* Connection status */ - int fd; /* File descriptor */ - CountedBuffer* inbuf; /* Holds input buffer */ - std::string outbuf; /* Holds output buffer */ + std::string outbuf; /* Holds output buffer (compressed) */ + std::string inbuf; /* Holds input buffer (compressed) */ }; class ModuleZLib : public Module @@ -145,35 +51,44 @@ class ModuleZLib : public Module float total_out_uncompressed; float total_in_uncompressed; + /* Used for reading data from the wire and compressing data to. */ + char *net_buffer; + unsigned int net_buffer_size; public: - ModuleZLib(InspIRCd* Me) - : Module::Module(Me) - { + ModuleZLib() + { ServerInstance->Modules->PublishInterface("BufferedSocketHook", this); sessions = new izip_session[ServerInstance->SE->GetMaxFds()]; + for (int i = 0; i < ServerInstance->SE->GetMaxFds(); i++) + sessions[i].status = IZIP_CLOSED; total_out_compressed = total_in_compressed = 0; - total_out_uncompressed = total_out_uncompressed = 0; - Implementation eventlist[] = { I_OnRawSocketConnect, I_OnRawSocketAccept, I_OnRawSocketClose, I_OnRawSocketRead, I_OnRawSocketWrite, I_OnStats, I_OnRequest }; - ServerInstance->Modules->Attach(eventlist, this, 7); + total_out_uncompressed = total_in_uncompressed = 0; + Implementation eventlist[] = { I_OnStats, I_OnRequest }; + ServerInstance->Modules->Attach(eventlist, this, 2); + + // Allocate a buffer which is used for reading and writing data + net_buffer_size = ServerInstance->Config->NetBufferSize; + net_buffer = new char[net_buffer_size]; } - virtual ~ModuleZLib() + ~ModuleZLib() { ServerInstance->Modules->UnpublishInterface("BufferedSocketHook", this); delete[] sessions; + delete[] net_buffer; } - virtual Version GetVersion() + Version GetVersion() { - return Version(1, 2, 0, 0, VF_VENDOR, API_VERSION); + return Version("Provides zlib link support for servers", VF_VENDOR, API_VERSION); } /* Handle BufferedSocketHook API requests */ - virtual const char* OnRequest(Request* request) + const char* OnRequest(Request* request) { ISHRequest* ISR = (ISHRequest*)request; if (strcmp("IS_NAME", request->GetId()) == 0) @@ -183,22 +98,13 @@ class ModuleZLib : public Module } else if (strcmp("IS_HOOK", request->GetId()) == 0) { - /* Attach to an inspsocket */ - const char* ret = "OK"; - try - { - ret = ServerInstance->Config->AddIOHook((Module*)this, (BufferedSocket*)ISR->Sock) ? "OK" : NULL; - } - catch (ModuleException& e) - { - return NULL; - } - return ret; + ISR->Sock->AddIOHook(this); + return "OK"; } else if (strcmp("IS_UNHOOK", request->GetId()) == 0) { - /* Detatch from an inspsocket */ - return ServerInstance->Config->DelIOHook((BufferedSocket*)ISR->Sock) ? "OK" : NULL; + ISR->Sock->DelIOHook(); + return "OK"; } else if (strcmp("IS_HSDONE", request->GetId()) == 0) { @@ -218,7 +124,7 @@ class ModuleZLib : public Module } /* Handle stats z (misc stats) */ - virtual int OnStats(char symbol, User* user, string_list &results) + ModResult OnStats(char symbol, User* user, string_list &results) { if (symbol == 'z') { @@ -231,13 +137,13 @@ class ModuleZLib : public Module * (we dont count 64 bit ints because not all systems have 64 bit ints, and floats * can still hold more. */ - float outbound_r = 100 - ((total_out_compressed / (total_out_uncompressed + 0.001)) * 100); - float inbound_r = 100 - ((total_in_compressed / (total_in_uncompressed + 0.001)) * 100); + float outbound_r = (total_out_compressed / (total_out_uncompressed + 0.001)) * 100; + float inbound_r = (total_in_compressed / (total_in_uncompressed + 0.001)) * 100; float total_compressed = total_in_compressed + total_out_compressed; float total_uncompressed = total_in_uncompressed + total_out_uncompressed; - float total_r = 100 - ((total_compressed / (total_uncompressed + 0.001)) * 100); + float total_r = (total_compressed / (total_uncompressed + 0.001)) * 100; char outbound_ratio[MAXBUF], inbound_ratio[MAXBUF], combined_ratio[MAXBUF]; @@ -249,23 +155,28 @@ class ModuleZLib : public Module results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_compressed = "+ConvToStr(total_in_compressed)); results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_uncompressed = "+ConvToStr(total_out_uncompressed)); results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_uncompressed = "+ConvToStr(total_in_uncompressed)); - results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS outbound_ratio = "+outbound_ratio); - results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS inbound_ratio = "+inbound_ratio); - results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS combined_ratio = "+combined_ratio); - return 0; + results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS percentage_of_original_outbound_traffic = "+outbound_ratio); + results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS percentage_of_orignal_inbound_traffic = "+inbound_ratio); + results.push_back(sn+" 304 "+user->nick+" :ZIPSTATS total_size_of_original_traffic = "+combined_ratio); + return MOD_RES_PASSTHRU; } - return 0; + return MOD_RES_PASSTHRU; + } + + void OnStreamSocketConnect(StreamSocket* user) + { + OnStreamSocketAccept(user, 0, 0); } - virtual void OnRawSocketAccept(int fd, const std::string &ip, int localport) + void OnRawSocketAccept(StreamSocket* user, irc::sockets::sockaddrs*, irc::sockets::sockaddrs*) { + int fd = user->GetFd(); + izip_session* session = &sessions[fd]; - /* allocate state and buffers */ - session->fd = fd; - session->status = IZIP_OPEN; - session->inbuf = new CountedBuffer(); + /* Just in case... */ + session->outbuf.clear(); session->c_stream.zalloc = (alloc_func)0; session->c_stream.zfree = (free_func)0; @@ -274,166 +185,214 @@ class ModuleZLib : public Module session->d_stream.zalloc = (alloc_func)0; session->d_stream.zfree = (free_func)0; session->d_stream.opaque = (voidpf)0; - } - virtual void OnRawSocketConnect(int fd) - { - /* Nothing special needs doing here compared to accept() */ - OnRawSocketAccept(fd, "", 0); + /* If we cant call this, well, we're boned. */ + if (inflateInit(&session->d_stream) != Z_OK) + { + session->status = IZIP_CLOSED; + return; + } + + /* Same here */ + if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK) + { + inflateEnd(&session->d_stream); + session->status = IZIP_CLOSED; + return; + } + + /* Just in case, do this last */ + session->status = IZIP_OPEN; } - virtual void OnRawSocketClose(int fd) + void OnStreamSocketClose(StreamSocket* user) { + int fd = user->GetFd(); CloseSession(&sessions[fd]); } - virtual int OnRawSocketRead(int fd, char* buffer, unsigned int count, int &readresult) + int OnStreamSocketRead(StreamSocket* user, std::string& recvq) { + int fd = user->GetFd(); /* Find the sockets session */ izip_session* session = &sessions[fd]; if (session->status == IZIP_CLOSED) - return 0; - - unsigned char compr[CHUNK + 4]; - unsigned int offset = 0; - unsigned int total_size = 0; - - /* Read CHUNK bytes at a time to the buffer (usually 128k) */ - readresult = read(fd, compr, CHUNK); + return -1; - /* Did we get anything? */ - if (readresult > 0) + if (session->inbuf.empty()) { - /* Add it to the frame queue */ - session->inbuf->AddData(compr, readresult); - total_in_compressed += readresult; + /* Read read_buffer_size bytes at a time to the buffer (usually 2.5k) */ + int readresult = read(fd, net_buffer, net_buffer_size); - /* Parse all completed frames */ - int size = 0; - while ((size = session->inbuf->GetFrame(compr, CHUNK)) != 0) + if (readresult < 0) { - session->d_stream.next_in = (Bytef*)compr; - session->d_stream.avail_in = 0; - session->d_stream.next_out = (Bytef*)(buffer + offset); - - /* If we cant call this, well, we're boned. */ - if (inflateInit(&session->d_stream) != Z_OK) + if (errno == EINTR || errno == EAGAIN) return 0; + } + if (readresult <= 0) + return -1; - while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)size)) - { - session->d_stream.avail_in = session->d_stream.avail_out = 1; - if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END) - break; - } + total_in_compressed += readresult; - /* Stick a fork in me, i'm done */ - inflateEnd(&session->d_stream); + /* Copy the compressed data into our input buffer */ + session->inbuf.append(net_buffer, readresult); + } - /* Update counters and offsets */ - total_size += session->d_stream.total_out; - total_in_uncompressed += session->d_stream.total_out; - offset += session->d_stream.total_out; - } + size_t in_len = session->inbuf.length(); + char* buffer = ServerInstance->GetReadBuffer(); + int count = ServerInstance->Config->NetBufferSize; - /* Null-terminate the buffer -- this doesnt harm binary data */ - buffer[total_size] = 0; + /* Prepare decompression */ + session->d_stream.next_in = (Bytef *)session->inbuf.c_str(); + session->d_stream.avail_in = in_len; - /* Set the read size to the correct total size */ - readresult = total_size; + session->d_stream.next_out = (Bytef*)buffer; + /* Last byte is reserved for NULL terminating that beast */ + session->d_stream.avail_out = count - 1; + /* Z_SYNC_FLUSH: Do as much as possible */ + int ret = inflate(&session->d_stream, Z_SYNC_FLUSH); + /* TODO CloseStream() in here at random places */ + switch (ret) + { + case Z_NEED_DICT: + case Z_STREAM_ERROR: + /* This is one of the 'not supposed to happen' things. + * Memory corruption, anyone? + */ + Error(session, "General Error. This is not supposed to happen :/"); + break; + case Z_DATA_ERROR: + Error(session, "Decompression failed, malformed data"); + break; + case Z_MEM_ERROR: + Error(session, "Out of memory"); + break; + case Z_BUF_ERROR: + /* This one is non-fatal, buffer is just full + * (can't happen here). + */ + Error(session, "Internal error. This is not supposed to happen."); + break; + case Z_STREAM_END: + /* This module *never* generates these :/ */ + Error(session, "End-of-stream marker received"); + break; + case Z_OK: + break; + default: + /* NO WAI! This can't happen. All errors are handled above. */ + Error(session, "Unknown error"); + break; } - return (readresult > 0); + if (ret != Z_OK) + { + return -1; + } + + /* Update the inbut buffer */ + unsigned int input_compressed = in_len - session->d_stream.avail_in; + session->inbuf = session->inbuf.substr(input_compressed); + + /* Update counters (Old size - new size) */ + unsigned int uncompressed_length = (count - 1) - session->d_stream.avail_out; + total_in_uncompressed += uncompressed_length; + + /* Null-terminate the buffer -- this doesnt harm binary data */ + recvq.append(buffer, uncompressed_length); + return 1; } - virtual int OnRawSocketWrite(int fd, const char* buffer, int count) + int OnStreamSocketWrite(StreamSocket* user, std::string& sendq) { + int fd = user->GetFd(); izip_session* session = &sessions[fd]; - int ocount = count; - - if (!count) /* Nothing to do! */ - return 0; if(session->status != IZIP_OPEN) - { /* Seriously, wtf? */ - CloseSession(session); - return 0; - } + return -1; - unsigned char compr[CHUNK + 4]; + int ret; - /* Gentlemen, start your engines! */ - if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK) + /* This loop is really only supposed to run once, but in case 'compr' + * is filled up somehow we are prepared to handle this situation. + */ + unsigned int offset = 0; + do { - CloseSession(session); - return 0; - } + /* Prepare compression */ + session->c_stream.next_in = (Bytef*)sendq.data() + offset; + session->c_stream.avail_in = sendq.length() - offset; - /* Set buffer sizes (we reserve 4 bytes at the start of the - * buffer for the length counters) - */ - session->c_stream.next_in = (Bytef*)buffer; - session->c_stream.next_out = compr + 4; + session->c_stream.next_out = (Bytef*)net_buffer; + session->c_stream.avail_out = net_buffer_size; - /* Compress the text */ - while ((session->c_stream.total_in < (unsigned int)count) && (session->c_stream.total_out < CHUNK)) - { - session->c_stream.avail_in = session->c_stream.avail_out = 1; - if (deflate(&session->c_stream, Z_NO_FLUSH) != Z_OK) + /* Compress the text */ + ret = deflate(&session->c_stream, Z_SYNC_FLUSH); + /* TODO CloseStream() in here at random places */ + switch (ret) { - CloseSession(session); - return 0; + case Z_OK: + break; + case Z_BUF_ERROR: + /* This one is non-fatal, buffer is just full + * (can't happen here). + */ + Error(session, "Internal error. This is not supposed to happen."); + break; + case Z_STREAM_ERROR: + /* This is one of the 'not supposed to happen' things. + * Memory corruption, anyone? + */ + Error(session, "General Error. This is also not supposed to happen."); + break; + default: + Error(session, "Unknown error"); + break; } - } - /* Finish the stream */ - for (session->c_stream.avail_out = 1; deflate(&session->c_stream, Z_FINISH) != Z_STREAM_END; session->c_stream.avail_out = 1); - deflateEnd(&session->c_stream); - total_out_uncompressed += ocount; - total_out_compressed += session->c_stream.total_out; + if (ret != Z_OK) + return 0; - /** Assemble the frame length onto the frame, in network byte order */ - compr[0] = (session->c_stream.total_out >> 24); - compr[1] = (session->c_stream.total_out >> 16); - compr[2] = (session->c_stream.total_out >> 8); - compr[3] = (session->c_stream.total_out & 0xFF); + /* Space before - space after stuff was added to this */ + unsigned int compressed = net_buffer_size - session->c_stream.avail_out; + unsigned int uncompressed = sendq.length() - session->c_stream.avail_in; - /* Add compressed data plus leading length to the output buffer - - * Note, we may have incomplete half-sent frames in here. - */ - session->outbuf.append((const char*)compr, session->c_stream.total_out + 4); + /* Make it skip the data which was compressed already */ + offset += uncompressed; + + /* Update stats */ + total_out_uncompressed += uncompressed; + total_out_compressed += compressed; + + /* Add compressed to the output buffer */ + session->outbuf.append((const char*)net_buffer, compressed); + } while (session->c_stream.avail_in != 0); /* Lets see how much we can send out */ - int ret = write(fd, session->outbuf.data(), session->outbuf.length()); + ret = write(fd, session->outbuf.data(), session->outbuf.length()); /* Check for errors, and advance the buffer if any was sent */ if (ret > 0) session->outbuf = session->outbuf.substr(ret); else if (ret < 1) { - if (ret == -1) - { - if (errno == EAGAIN) - return 0; - else - { - session->outbuf.clear(); - return 0; - } - } + if (errno == EAGAIN) + return 0; else { session->outbuf.clear(); - return 0; + return -1; } } - /* ALL LIES the lot of it, we havent really written - * this amount, but the layer above doesnt need to know. - */ - return ocount; + return 1; + } + + void Error(izip_session* session, const std::string &text) + { + ServerInstance->SNO->WriteToSnoMask('l', "ziplink error: " + text); } void CloseSession(izip_session* session) @@ -442,7 +401,8 @@ class ModuleZLib : public Module { session->status = IZIP_CLOSED; session->outbuf.clear(); - delete session->inbuf; + inflateEnd(&session->d_stream); + deflateEnd(&session->c_stream); } }