]> git.netwichtig.de Git - user/henk/code/inspircd.git/blobdiff - src/inspsocket.cpp
Add support for blocking tag messages with the deaf mode.
[user/henk/code/inspircd.git] / src / inspsocket.cpp
index 5f2da23419b2db00b86cff7549f82479c0919832..e00429bbafc0a36ab90e349a1751a62f41155b1a 100644 (file)
@@ -1,12 +1,19 @@
 /*
  * InspIRCd -- Internet Relay Chat Daemon
  *
- *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
- *   Copyright (C) 2007-2009 Robin Burchell <robin+git@viroteck.net>
- *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
- *   Copyright (C) 2006-2007 Craig Edwards <craigedwards@brainbox.cc>
+ *   Copyright (C) 2020 Matt Schatz <genius3000@g3k.solutions>
+ *   Copyright (C) 2019 linuxdaemon <linuxdaemon.irc@gmail.com>
+ *   Copyright (C) 2018 Dylan Frank <b00mx0r@aureus.pw>
+ *   Copyright (C) 2013-2016 Attila Molnar <attilamolnar@hush.com>
+ *   Copyright (C) 2013, 2017-2020 Sadie Powell <sadie@witchery.services>
+ *   Copyright (C) 2013 Adam <Adam@anope.org>
+ *   Copyright (C) 2012 Robby <robby@chatbelgie.be>
+ *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
+ *   Copyright (C) 2007-2008 Robin Burchell <robin+git@viroteck.net>
+ *   Copyright (C) 2007 John Brooks <special@inspircd.org>
  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
- *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
+ *   Copyright (C) 2006-2007 Craig Edwards <brain@inspircd.org>
+ *   Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
  *
  * This file is part of InspIRCd.  InspIRCd is free software: you can
  * redistribute it and/or modify it under the terms of the GNU General Public
 
 
 #include "inspircd.h"
-#include "socket.h"
-#include "inspstring.h"
-#include "socketengine.h"
+#include "iohook.h"
 
-#ifndef DISABLE_WRITEV
-#include <sys/uio.h>
-#endif
-
-#ifndef IOV_MAX
-#define IOV_MAX 1024
-#endif
+static IOHook* GetNextHook(IOHook* hook)
+{
+       IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
+       if (iohm)
+               return iohm->GetNextHook();
+       return NULL;
+}
 
 BufferedSocket::BufferedSocket()
 {
@@ -46,59 +51,38 @@ BufferedSocket::BufferedSocket(int newfd)
        Timeout = NULL;
        this->fd = newfd;
        this->state = I_CONNECTED;
-       if (fd > -1)
-               ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
+       if (HasFd())
+               SocketEngine::AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
 }
 
-void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
+void BufferedSocket::DoConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int maxtime)
 {
-       BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
+       BufferedSocketError err = BeginConnect(dest, bind, maxtime);
        if (err != I_ERR_NONE)
        {
                state = I_ERROR;
-               SetError(strerror(errno));
+               SetError(SocketEngine::LastError());
                OnError(err);
        }
 }
 
-BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
-{
-       irc::sockets::sockaddrs addr, bind;
-       if (!irc::sockets::aptosa(ipaddr, aport, addr))
-       {
-               ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
-               return I_ERR_CONNECT;
-       }
-
-       bind.sa.sa_family = 0;
-       if (!connectbindip.empty())
-       {
-               if (!irc::sockets::aptosa(connectbindip, 0, bind))
-               {
-                       return I_ERR_BIND;
-               }
-       }
-
-       return BeginConnect(addr, bind, maxtime);
-}
-
-BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned long timeout)
+BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int timeout)
 {
-       if (fd < 0)
-               fd = socket(dest.sa.sa_family, SOCK_STREAM, 0);
+       if (!HasFd())
+               fd = socket(dest.family(), SOCK_STREAM, 0);
 
-       if (fd < 0)
+       if (!HasFd())
                return I_ERR_SOCKET;
 
-       if (bind.sa.sa_family != 0)
+       if (bind.family() != 0)
        {
-               if (ServerInstance->SE->Bind(fd, bind) < 0)
+               if (SocketEngine::Bind(fd, bind) < 0)
                        return I_ERR_BIND;
        }
 
-       ServerInstance->SE->NonBlocking(fd);
+       SocketEngine::NonBlocking(fd);
 
-       if (ServerInstance->SE->Connect(this, &dest.sa, sa_size(dest)) == -1)
+       if (SocketEngine::Connect(this, dest) == -1)
        {
                if (errno != EINPROGRESS)
                        return I_ERR_CONNECT;
@@ -106,42 +90,49 @@ BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs&
 
        this->state = I_CONNECTING;
 
-       if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
+       if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
                return I_ERR_NOMOREFDS;
 
-       this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
-       ServerInstance->Timers->AddTimer(this->Timeout);
+       this->Timeout = new SocketTimeout(this->GetFd(), this, timeout);
+       ServerInstance->Timers.AddTimer(this->Timeout);
 
-       ServerInstance->Logs->Log("SOCKET", LOG_DEBUG,"BufferedSocket::DoConnect success");
+       ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success");
        return I_ERR_NONE;
 }
 
 void StreamSocket::Close()
 {
-       if (this->fd > -1)
+       if (closing)
+               return;
+
+       closing = true;
+       if (HasFd())
        {
                // final chance, dump as much of the sendq as we can
                DoWrite();
-               if (IOHook)
+
+               IOHook* hook = GetIOHook();
+               DelIOHook();
+               while (hook)
                {
-                       try
-                       {
-                               IOHook->OnStreamSocketClose(this);
-                       }
-                       catch (CoreException& modexcept)
-                       {
-                               ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT,"%s threw an exception: %s",
-                                       modexcept.GetSource(), modexcept.GetReason());
-                       }
-                       IOHook = NULL;
+                       hook->OnStreamSocketClose(this);
+                       IOHook* const nexthook = GetNextHook(hook);
+                       delete hook;
+                       hook = nexthook;
                }
-               ServerInstance->SE->Shutdown(this, 2);
-               ServerInstance->SE->DelFd(this);
-               ServerInstance->SE->Close(this);
-               fd = -1;
+               SocketEngine::Shutdown(this, 2);
+               SocketEngine::Close(this);
        }
 }
 
+void StreamSocket::Close(bool writeblock)
+{
+       if (getSendQSize() != 0 && writeblock)
+               closeonempty = true;
+       else
+               Close();
+}
+
 CullResult StreamSocket::cull()
 {
        Close();
@@ -153,67 +144,79 @@ bool StreamSocket::GetNextLine(std::string& line, char delim)
        std::string::size_type i = recvq.find(delim);
        if (i == std::string::npos)
                return false;
-       line = recvq.substr(0, i);
-       // TODO is this the most efficient way to split?
-       recvq = recvq.substr(i + 1);
+       line.assign(recvq, 0, i);
+       recvq.erase(0, i + 1);
        return true;
 }
 
-void StreamSocket::DoRead()
+int StreamSocket::HookChainRead(IOHook* hook, std::string& rq)
 {
-       if (IOHook)
+       if (!hook)
+               return ReadToRecvQ(rq);
+
+       IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
+       if (iohm)
        {
-               int rv = -1;
-               try
-               {
-                       rv = IOHook->OnStreamSocketRead(this, recvq);
-               }
-               catch (CoreException& modexcept)
-               {
-                       ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "%s threw an exception: %s",
-                               modexcept.GetSource(), modexcept.GetReason());
-                       return;
-               }
-               if (rv > 0)
-                       OnDataReady();
-               if (rv < 0)
-                       SetError("Read Error"); // will not overwrite a better error message
+               // Call the next hook to put data into the recvq of the current hook
+               const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ());
+               if (ret <= 0)
+                       return ret;
        }
-       else
+       return hook->OnStreamSocketRead(this, rq);
+}
+
+void StreamSocket::DoRead()
+{
+       const std::string::size_type prevrecvqsize = recvq.size();
+
+       const int result = HookChainRead(GetIOHook(), recvq);
+       if (result < 0)
        {
+               SetError("Read Error"); // will not overwrite a better error message
+               return;
+       }
+
+       if (recvq.size() > prevrecvqsize)
+               OnDataReady();
+}
+
+int StreamSocket::ReadToRecvQ(std::string& rq)
+{
                char* ReadBuffer = ServerInstance->GetReadBuffer();
-               int n = ServerInstance->SE->Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
+               int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
                if (n == ServerInstance->Config->NetBufferSize)
                {
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
-                       recvq.append(ReadBuffer, n);
-                       OnDataReady();
+                       SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+                       rq.append(ReadBuffer, n);
                }
                else if (n > 0)
                {
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
-                       recvq.append(ReadBuffer, n);
-                       OnDataReady();
+                       SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ);
+                       rq.append(ReadBuffer, n);
                }
                else if (n == 0)
                {
                        error = "Connection closed";
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+                       SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+                       return -1;
                }
-               else if (errno == EAGAIN)
+               else if (SocketEngine::IgnoreError())
                {
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
+                       SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
+                       return 0;
                }
                else if (errno == EINTR)
                {
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+                       SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+                       return 0;
                }
                else
                {
-                       error = strerror(errno);
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+                       error = SocketEngine::LastError();
+                       SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+                       return -1;
                }
-       }
+       return n;
 }
 
 /* Don't try to prepare huge blobs of data to send to a blocked socket */
@@ -221,118 +224,64 @@ static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128;
 
 void StreamSocket::DoWrite()
 {
-       if (sendq.empty())
+       if (getSendQSize() == 0)
+       {
+               if (closeonempty)
+                       Close();
+
                return;
-       if (!error.empty() || fd < 0 || fd == INT_MAX)
+       }
+       if (!error.empty() || !HasFd())
        {
                ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket");
                return;
        }
 
-#ifndef DISABLE_WRITEV
-       if (IOHook)
-#endif
+       SendQueue* psendq = &sendq;
+       IOHook* hook = GetIOHook();
+       while (hook)
        {
-               int rv = -1;
-               try
-               {
-                       while (error.empty() && !sendq.empty())
-                       {
-                               if (sendq.size() > 1 && sendq[0].length() < 1024)
-                               {
-                                       // Avoid multiple repeated SSL encryption invocations
-                                       // This adds a single copy of the queue, but avoids
-                                       // much more overhead in terms of system calls invoked
-                                       // by the IOHook.
-                                       //
-                                       // The length limit of 1024 is to prevent merging strings
-                                       // more than once when writes begin to block.
-                                       std::string tmp;
-                                       tmp.reserve(sendq_len);
-                                       for(unsigned int i=0; i < sendq.size(); i++)
-                                               tmp.append(sendq[i]);
-                                       sendq.clear();
-                                       sendq.push_back(tmp);
-                               }
-                               std::string& front = sendq.front();
-                               int itemlen = front.length();
-                               if (IOHook)
-                               {
-                                       rv = IOHook->OnStreamSocketWrite(this, front);
-                                       if (rv > 0)
-                                       {
-                                               // consumed the entire string, and is ready for more
-                                               sendq_len -= itemlen;
-                                               sendq.pop_front();
-                                       }
-                                       else if (rv == 0)
-                                       {
-                                               // socket has blocked. Stop trying to send data.
-                                               // IOHook has requested unblock notification from the socketengine
+               int rv = hook->OnStreamSocketWrite(this, *psendq);
+               psendq = NULL;
 
-                                               // Since it is possible that a partial write took place, adjust sendq_len
-                                               sendq_len = sendq_len - itemlen + front.length();
-                                               return;
-                                       }
-                                       else
-                                       {
-                                               SetError("Write Error"); // will not overwrite a better error message
-                                               return;
-                                       }
-                               }
-#ifdef DISABLE_WRITEV
-                               else
-                               {
-                                       rv = ServerInstance->SE->Send(this, front.data(), itemlen, 0);
-                                       if (rv == 0)
-                                       {
-                                               SetError("Connection closed");
-                                               return;
-                                       }
-                                       else if (rv < 0)
-                                       {
-                                               if (errno == EAGAIN || errno == EINTR)
-                                                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
-                                               else
-                                                       SetError(strerror(errno));
-                                               return;
-                                       }
-                                       else if (rv < itemlen)
-                                       {
-                                               ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
-                                               front = front.substr(rv);
-                                               sendq_len -= rv;
-                                               return;
-                                       }
-                                       else
-                                       {
-                                               sendq_len -= itemlen;
-                                               sendq.pop_front();
-                                               if (sendq.empty())
-                                                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_EDGE_WRITE);
-                                       }
-                               }
-#endif
-                       }
+               // rv == 0 means the socket has blocked. Stop trying to send data.
+               // IOHook has requested unblock notification from the socketengine.
+               if (rv == 0)
+                       break;
+
+               if (rv < 0)
+               {
+                       SetError("Write Error"); // will not overwrite a better error message
+                       break;
                }
-               catch (CoreException& modexcept)
+
+               IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
+               hook = NULL;
+               if (iohm)
                {
-                       ServerInstance->Logs->Log("SOCKET", LOG_DEBUG,"%s threw an exception: %s",
-                               modexcept.GetSource(), modexcept.GetReason());
+                       psendq = &iohm->GetSendQ();
+                       hook = iohm->GetNextHook();
                }
        }
-#ifndef DISABLE_WRITEV
-       else
-       {
+
+       if (psendq)
+               FlushSendQ(*psendq);
+
+       if (getSendQSize() == 0 && closeonempty)
+               Close();
+}
+
+void StreamSocket::FlushSendQ(SendQueue& sq)
+{
                // don't even try if we are known to be blocking
                if (GetEventMask() & FD_WRITE_WILL_BLOCK)
                        return;
                // start out optimistic - we won't need to write any more
                int eventChange = FD_WANT_EDGE_WRITE;
-               while (error.empty() && sendq_len && eventChange == FD_WANT_EDGE_WRITE)
+               while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE)
                {
                        // Prepare a writev() call to write all buffers efficiently
-                       int bufcount = sendq.size();
+                       int bufcount = sq.size();
 
                        // cap the number of buffers at MYIOV_MAX
                        if (bufcount > MYIOV_MAX)
@@ -341,22 +290,25 @@ void StreamSocket::DoWrite()
                        }
 
                        int rv_max = 0;
-                       iovec* iovecs = new iovec[bufcount];
-                       for(int i=0; i < bufcount; i++)
+                       int rv;
                        {
-                               iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
-                               iovecs[i].iov_len = sendq[i].length();
-                               rv_max += sendq[i].length();
+                               SocketEngine::IOVector iovecs[MYIOV_MAX];
+                               size_t j = 0;
+                               for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++)
+                               {
+                                       const SendQueue::Element& elem = *i;
+                                       iovecs[j].iov_base = const_cast<char*>(elem.data());
+                                       iovecs[j].iov_len = elem.length();
+                                       rv_max += iovecs[j].iov_len;
+                               }
+                               rv = SocketEngine::WriteV(this, iovecs, bufcount);
                        }
-                       int rv = writev(fd, iovecs, bufcount);
-                       delete[] iovecs;
 
-                       if (rv == (int)sendq_len)
+                       if (rv == (int)sq.bytes())
                        {
                                // it's our lucky day, everything got written out. Fast cleanup.
                                // This won't ever happen if the number of buffers got capped.
-                               sendq_len = 0;
-                               sendq.clear();
+                               sq.clear();
                        }
                        else if (rv > 0)
                        {
@@ -366,20 +318,19 @@ void StreamSocket::DoWrite()
                                        // it's going to block now
                                        eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
                                }
-                               sendq_len -= rv;
-                               while (rv > 0 && !sendq.empty())
+                               while (rv > 0 && !sq.empty())
                                {
-                                       std::string& front = sendq.front();
+                                       const SendQueue::Element& front = sq.front();
                                        if (front.length() <= (size_t)rv)
                                        {
                                                // this string got fully written out
                                                rv -= front.length();
-                                               sendq.pop_front();
+                                               sq.pop_front();
                                        }
                                        else
                                        {
                                                // stopped in the middle of this string
-                                               front = front.substr(rv);
+                                               sq.erase_front(rv);
                                                rv = 0;
                                        }
                                }
@@ -388,7 +339,7 @@ void StreamSocket::DoWrite()
                        {
                                error = "Connection closed";
                        }
-                       else if (errno == EAGAIN)
+                       else if (SocketEngine::IgnoreError())
                        {
                                eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
                        }
@@ -399,25 +350,28 @@ void StreamSocket::DoWrite()
                        }
                        else
                        {
-                               error = strerror(errno);
+                               error = SocketEngine::LastError();
                        }
                }
                if (!error.empty())
                {
                        // error - kill all events
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+                       SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
                }
                else
                {
-                       ServerInstance->SE->ChangeEventMask(this, eventChange);
+                       SocketEngine::ChangeEventMask(this, eventChange);
                }
-       }
-#endif
+}
+
+bool StreamSocket::OnSetEndPoint(const irc::sockets::sockaddrs& local, const irc::sockets::sockaddrs& remote)
+{
+       return false;
 }
 
 void StreamSocket::WriteData(const std::string &data)
 {
-       if (fd < 0)
+       if (!HasFd())
        {
                ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Attempt to write data to dead socket: %s",
                        data.c_str());
@@ -426,17 +380,19 @@ void StreamSocket::WriteData(const std::string &data)
 
        /* Append the data to the back of the queue ready for writing */
        sendq.push_back(data);
-       sendq_len += data.length();
 
-       ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
+       SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
 }
 
 bool SocketTimeout::Tick(time_t)
 {
-       ServerInstance->Logs->Log("SOCKET", LOG_DEBUG,"SocketTimeout::Tick");
+       ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "SocketTimeout::Tick");
 
-       if (ServerInstance->SE->GetRef(this->sfd) != this->sock)
+       if (SocketEngine::GetRef(this->sfd) != this->sock)
+       {
+               delete this;
                return false;
+       }
 
        if (this->sock->state == I_CONNECTING)
        {
@@ -452,87 +408,93 @@ bool SocketTimeout::Tick(time_t)
        }
 
        this->sock->Timeout = NULL;
+       delete this;
        return false;
 }
 
 void BufferedSocket::OnConnected() { }
 void BufferedSocket::OnTimeout() { return; }
 
-void BufferedSocket::DoWrite()
+void BufferedSocket::OnEventHandlerWrite()
 {
        if (state == I_CONNECTING)
        {
                state = I_CONNECTED;
                this->OnConnected();
-               if (GetIOHook())
-                       GetIOHook()->OnStreamSocketConnect(this);
-               else
-                       ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
+               if (!GetIOHook())
+                       SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
        }
-       this->StreamSocket::DoWrite();
+       this->StreamSocket::OnEventHandlerWrite();
 }
 
 BufferedSocket::~BufferedSocket()
 {
        this->Close();
-       if (Timeout)
+       // The timer is removed from the TimerManager in Timer::~Timer()
+       delete Timeout;
+}
+
+void StreamSocket::OnEventHandlerError(int errornum)
+{
+       if (!error.empty())
+               return;
+
+       if (errornum == 0)
+               SetError("Connection closed");
+       else
+               SetError(SocketEngine::GetError(errornum));
+
+       BufferedSocketError errcode = I_ERR_OTHER;
+       switch (errornum)
        {
-               // The timer is removed from the TimerManager in Timer::~Timer()
-               delete Timeout;
+               case ETIMEDOUT:
+                       errcode = I_ERR_TIMEOUT;
+                       break;
+               case ECONNREFUSED:
+               case 0:
+                       errcode = I_ERR_CONNECT;
+                       break;
+               case EADDRINUSE:
+                       errcode = I_ERR_BIND;
+                       break;
+               case EPIPE:
+               case EIO:
+                       errcode = I_ERR_WRITE;
+                       break;
        }
+
+       // Log and call OnError()
+       CheckError(errcode);
 }
 
-void StreamSocket::HandleEvent(EventType et, int errornum)
+void StreamSocket::OnEventHandlerRead()
 {
        if (!error.empty())
                return;
-       BufferedSocketError errcode = I_ERR_OTHER;
-       try {
-               switch (et)
-               {
-                       case EVENT_ERROR:
-                       {
-                               if (errornum == 0)
-                                       SetError("Connection closed");
-                               else
-                                       SetError(strerror(errornum));
-                               switch (errornum)
-                               {
-                                       case ETIMEDOUT:
-                                               errcode = I_ERR_TIMEOUT;
-                                               break;
-                                       case ECONNREFUSED:
-                                       case 0:
-                                               errcode = I_ERR_CONNECT;
-                                               break;
-                                       case EADDRINUSE:
-                                               errcode = I_ERR_BIND;
-                                               break;
-                                       case EPIPE:
-                                       case EIO:
-                                               errcode = I_ERR_WRITE;
-                                               break;
-                               }
-                               break;
-                       }
-                       case EVENT_READ:
-                       {
-                               DoRead();
-                               break;
-                       }
-                       case EVENT_WRITE:
-                       {
-                               DoWrite();
-                               break;
-                       }
-               }
+
+       try
+       {
+               DoRead();
        }
        catch (CoreException& ex)
        {
-               ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'",
-                       fd, ex.GetReason());
+               ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str());
                SetError(ex.GetReason());
        }
+       CheckError(I_ERR_OTHER);
+}
+
+void StreamSocket::OnEventHandlerWrite()
+{
+       if (!error.empty())
+               return;
+
+       DoWrite();
+       CheckError(I_ERR_OTHER);
+}
+
+void StreamSocket::CheckError(BufferedSocketError errcode)
+{
        if (!error.empty())
        {
                ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
@@ -540,3 +502,75 @@ void StreamSocket::HandleEvent(EventType et, int errornum)
        }
 }
 
+IOHook* StreamSocket::GetModHook(Module* mod) const
+{
+       for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr))
+       {
+               if (curr->prov->creator == mod)
+                       return curr;
+       }
+       return NULL;
+}
+
+IOHook* StreamSocket::GetLastHook() const
+{
+       IOHook* curr = GetIOHook();
+       IOHook* last = curr;
+
+       for (; curr; curr = GetNextHook(curr))
+               last = curr;
+
+       return last;
+}
+
+
+void StreamSocket::AddIOHook(IOHook* newhook)
+{
+       IOHook* curr = GetIOHook();
+       if (!curr)
+       {
+               iohook = newhook;
+               return;
+       }
+
+       IOHookMiddle* lasthook;
+       while (curr)
+       {
+               lasthook = IOHookMiddle::ToMiddleHook(curr);
+               if (!lasthook)
+                       return;
+               curr = lasthook->GetNextHook();
+       }
+
+       lasthook->SetNextHook(newhook);
+}
+
+size_t StreamSocket::getSendQSize() const
+{
+       size_t ret = sendq.bytes();
+       IOHook* curr = GetIOHook();
+       while (curr)
+       {
+               const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr);
+               if (!iohm)
+                       break;
+
+               ret += iohm->GetSendQ().bytes();
+               curr = iohm->GetNextHook();
+       }
+       return ret;
+}
+
+void StreamSocket::SwapInternals(StreamSocket& other)
+{
+       if (type != other.type)
+               return;
+
+       EventHandler::SwapInternals(other);
+       std::swap(closeonempty, other.closeonempty);
+       std::swap(closing, other.closing);
+       std::swap(error, other.error);
+       std::swap(iohook, other.iohook);
+       std::swap(recvq, other.recvq);
+       std::swap(sendq, other.sendq);
+}