diff options
author | danieldg <danieldg@e03df62e-2008-0410-955e-edbf42e46eb7> | 2009-09-21 13:26:31 +0000 |
---|---|---|
committer | danieldg <danieldg@e03df62e-2008-0410-955e-edbf42e46eb7> | 2009-09-21 13:26:31 +0000 |
commit | e2af2347fc035d702e45f12e772223a8d578410d (patch) | |
tree | bfd80aac2858a9f4faedc316794fc1051dbaa72c /src/modules/extra/m_ziplink.cpp | |
parent | 16fc672b685752007e47aed0fb97bc1ee7443c76 (diff) |
Create StreamSocket for IO hooking implementation
Fixes the SSL SendQ bug
Removes duplicate code between User and BufferedSocket
Simplify SSL module API
Simplify EventHandler API (Readable/Writeable moved to SE)
Add hook for culled objects to invoke callbacks prior to destructor
Replace SocketCull with GlobalCull now that sockets can close themselves
Shorten common case of user read/parse/write path:
User::Write is now zero-copy up to syscall/SSL invocation
User::Read has only two copy/scan passes from read() to ProcessCommand
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@11752 e03df62e-2008-0410-955e-edbf42e46eb7
Diffstat (limited to 'src/modules/extra/m_ziplink.cpp')
-rw-r--r-- | src/modules/extra/m_ziplink.cpp | 111 |
1 files changed, 44 insertions, 67 deletions
diff --git a/src/modules/extra/m_ziplink.cpp b/src/modules/extra/m_ziplink.cpp index c220460bd..7d090d80a 100644 --- a/src/modules/extra/m_ziplink.cpp +++ b/src/modules/extra/m_ziplink.cpp @@ -67,29 +67,29 @@ class ModuleZLib : public Module total_out_compressed = total_in_compressed = 0; total_out_uncompressed = total_in_uncompressed = 0; - Implementation eventlist[] = { I_OnRawSocketConnect, I_OnRawSocketAccept, I_OnRawSocketClose, I_OnRawSocketRead, I_OnRawSocketWrite, I_OnStats, I_OnRequest }; - ServerInstance->Modules->Attach(eventlist, this, 7); + Implementation eventlist[] = { I_OnStats, I_OnRequest }; + ServerInstance->Modules->Attach(eventlist, this, 2); // Allocate a buffer which is used for reading and writing data net_buffer_size = ServerInstance->Config->NetBufferSize; net_buffer = new char[net_buffer_size]; } - virtual ~ModuleZLib() + ~ModuleZLib() { ServerInstance->Modules->UnpublishInterface("BufferedSocketHook", this); delete[] sessions; delete[] net_buffer; } - virtual Version GetVersion() + Version GetVersion() { return Version("$Id$", VF_VENDOR, API_VERSION); } /* Handle BufferedSocketHook API requests */ - virtual const char* OnRequest(Request* request) + const char* OnRequest(Request* request) { ISHRequest* ISR = (ISHRequest*)request; if (strcmp("IS_NAME", request->GetId()) == 0) @@ -99,22 +99,13 @@ class ModuleZLib : public Module } else if (strcmp("IS_HOOK", request->GetId()) == 0) { - /* Attach to an inspsocket */ - const char* ret = "OK"; - try - { - ret = ISR->Sock->AddIOHook((Module*)this) ? "OK" : NULL; - } - catch (ModuleException& e) - { - return NULL; - } - return ret; + ISR->Sock->AddIOHook(this); + return "OK"; } else if (strcmp("IS_UNHOOK", request->GetId()) == 0) { - /* Detach from an inspsocket */ - return ISR->Sock->DelIOHook() ? "OK" : NULL; + ISR->Sock->DelIOHook(); + return "OK"; } else if (strcmp("IS_HSDONE", request->GetId()) == 0) { @@ -134,7 +125,7 @@ class ModuleZLib : public Module } /* Handle stats z (misc stats) */ - virtual ModResult OnStats(char symbol, User* user, string_list &results) + ModResult OnStats(char symbol, User* user, string_list &results) { if (symbol == 'z') { @@ -174,10 +165,14 @@ class ModuleZLib : public Module return MOD_RES_PASSTHRU; } - virtual void OnRawSocketConnect(int fd) + void OnStreamSocketConnect(StreamSocket* user) { - if ((fd < 0) || (fd > ServerInstance->SE->GetMaxFds() - 1)) - return; + OnStreamSocketAccept(user, 0, 0); + } + + void OnRawSocketAccept(StreamSocket* user, irc::sockets::sockaddrs*, irc::sockets::sockaddrs*) + { + int fd = user->GetFd(); izip_session* session = &sessions[fd]; @@ -211,39 +206,33 @@ class ModuleZLib : public Module session->status = IZIP_OPEN; } - virtual void OnRawSocketAccept(int fd, irc::sockets::sockaddrs*, irc::sockets::sockaddrs*) - { - /* Nothing special needs doing here compared to connect() */ - OnRawSocketConnect(fd); - } - - virtual void OnRawSocketClose(int fd) + void OnStreamSocketClose(StreamSocket* user) { + int fd = user->GetFd(); CloseSession(&sessions[fd]); } - virtual int OnRawSocketRead(int fd, char* buffer, unsigned int count, int &readresult) + int OnStreamSocketRead(StreamSocket* user, std::string& recvq) { + int fd = user->GetFd(); /* Find the sockets session */ izip_session* session = &sessions[fd]; if (session->status == IZIP_CLOSED) - return 0; + return -1; - if (session->inbuf.length()) - { - /* Our input buffer is filling up. This is *BAD*. - * We can't return more data than fits into buffer - * (count bytes), so we will generate another read - * event on purpose by *NOT* reading from 'fd' at all - * for now. - */ - readresult = 0; - } - else + if (session->inbuf.empty()) { /* Read read_buffer_size bytes at a time to the buffer (usually 2.5k) */ - readresult = read(fd, net_buffer, net_buffer_size); + int readresult = read(fd, net_buffer, net_buffer_size); + + if (readresult < 0) + { + if (errno == EINTR || errno == EAGAIN) + return 0; + } + if (readresult <= 0) + return -1; total_in_compressed += readresult; @@ -252,10 +241,8 @@ class ModuleZLib : public Module } size_t in_len = session->inbuf.length(); - - /* Do we have anything to do? */ - if (in_len <= 0) - return 0; + char* buffer = ServerInstance->GetReadBuffer(); + int count = ServerInstance->Config->NetBufferSize; /* Prepare decompression */ session->d_stream.next_in = (Bytef *)session->inbuf.c_str(); @@ -302,8 +289,7 @@ class ModuleZLib : public Module } if (ret != Z_OK) { - readresult = 0; - return 0; + return -1; } /* Update the inbut buffer */ @@ -315,24 +301,18 @@ class ModuleZLib : public Module total_in_uncompressed += uncompressed_length; /* Null-terminate the buffer -- this doesnt harm binary data */ - buffer[uncompressed_length] = 0; - - /* Set the read size to the correct total size */ - readresult = uncompressed_length; - + recvq.append(buffer, uncompressed_length); return 1; } - virtual int OnRawSocketWrite(int fd, const char* buffer, int count) + int OnStreamSocketWrite(StreamSocket* user, std::string& sendq) { + int fd = user->GetFd(); izip_session* session = &sessions[fd]; - if (!count) /* Nothing to do! */ - return 0; - if(session->status != IZIP_OPEN) /* Seriously, wtf? */ - return 0; + return -1; int ret; @@ -343,8 +323,8 @@ class ModuleZLib : public Module do { /* Prepare compression */ - session->c_stream.next_in = (Bytef*)buffer + offset; - session->c_stream.avail_in = count - offset; + session->c_stream.next_in = (Bytef*)sendq.data() + offset; + session->c_stream.avail_in = sendq.length() - offset; session->c_stream.next_out = (Bytef*)net_buffer; session->c_stream.avail_out = net_buffer_size; @@ -378,7 +358,7 @@ class ModuleZLib : public Module /* Space before - space after stuff was added to this */ unsigned int compressed = net_buffer_size - session->c_stream.avail_out; - unsigned int uncompressed = count - session->c_stream.avail_in; + unsigned int uncompressed = sendq.length() - session->c_stream.avail_in; /* Make it skip the data which was compressed already */ offset += uncompressed; @@ -404,14 +384,11 @@ class ModuleZLib : public Module else { session->outbuf.clear(); - return 0; + return -1; } } - /* ALL LIES the lot of it, we havent really written - * this amount, but the layer above doesnt need to know. - */ - return count; + return 1; } void Error(izip_session* session, const std::string &text) |