From e2af2347fc035d702e45f12e772223a8d578410d Mon Sep 17 00:00:00 2001 From: danieldg Date: Mon, 21 Sep 2009 13:26:31 +0000 Subject: Create StreamSocket for IO hooking implementation Fixes the SSL SendQ bug Removes duplicate code between User and BufferedSocket Simplify SSL module API Simplify EventHandler API (Readable/Writeable moved to SE) Add hook for culled objects to invoke callbacks prior to destructor Replace SocketCull with GlobalCull now that sockets can close themselves Shorten common case of user read/parse/write path: User::Write is now zero-copy up to syscall/SSL invocation User::Read has only two copy/scan passes from read() to ProcessCommand git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@11752 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/inspsocket.cpp | 567 ++++++++++++++++++++--------------------------------- 1 file changed, 214 insertions(+), 353 deletions(-) (limited to 'src/inspsocket.cpp') diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp index 0350858e7..964582062 100644 --- a/src/inspsocket.cpp +++ b/src/inspsocket.cpp @@ -16,195 +16,100 @@ #include "inspstring.h" #include "socketengine.h" -bool BufferedSocket::Readable() +BufferedSocket::BufferedSocket() { - return (this->state != I_CONNECTING); + Timeout = NULL; + state = I_ERROR; } -BufferedSocket::BufferedSocket(InspIRCd* SI) +BufferedSocket::BufferedSocket(int newfd) { - this->Timeout = NULL; - this->state = I_DISCONNECTED; - this->fd = -1; - this->ServerInstance = SI; -} - -BufferedSocket::BufferedSocket(InspIRCd* SI, int newfd, const char* ip) -{ - this->Timeout = NULL; + Timeout = NULL; this->fd = newfd; this->state = I_CONNECTED; - strlcpy(this->IP,ip,MAXBUF); - this->ServerInstance = SI; - if (this->fd > -1) - this->ServerInstance->SE->AddFd(this); + if (fd > -1) + ServerInstance->SE->AddFd(this); } -BufferedSocket::BufferedSocket(InspIRCd* SI, const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip) +void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip) { - this->cbindip = connectbindip; - this->fd = -1; - this->ServerInstance = SI; - strlcpy(host,ipaddr.c_str(),MAXBUF); - this->Timeout = NULL; - - strlcpy(this->host,ipaddr.c_str(),MAXBUF); - this->port = aport; - - irc::sockets::sockaddrs testaddr; - if (!irc::sockets::aptosa(host, aport, &testaddr)) - { - this->ServerInstance->Logs->Log("SOCKET", DEBUG,"BUG: Hostname passed to BufferedSocket, rather than an IP address!"); - this->OnError(I_ERR_CONNECT); - this->Close(); - this->fd = -1; - this->state = I_ERROR; - return; - } - else + BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip); + if (err != I_ERR_NONE) { - strlcpy(this->IP,host,MAXBUF); - if (!this->DoConnect(maxtime)) - { - this->OnError(I_ERR_CONNECT); - this->Close(); - this->fd = -1; - this->state = I_ERROR; - return; - } + state = I_ERROR; + SetError(strerror(errno)); + OnError(err); } } -void BufferedSocket::SetQueues() +BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip) { - // attempt to increase socket sendq and recvq as high as its possible - int sendbuf = 32768; - int recvbuf = 32768; - if(setsockopt(this->fd,SOL_SOCKET,SO_SNDBUF,(const char *)&sendbuf,sizeof(sendbuf)) || setsockopt(this->fd,SOL_SOCKET,SO_RCVBUF,(const char *)&recvbuf,sizeof(sendbuf))) + irc::sockets::sockaddrs addr, bind; + if (!irc::sockets::aptosa(ipaddr.c_str(), aport, &addr)) { - //this->ServerInstance->Log(DEFAULT, "Could not increase SO_SNDBUF/SO_RCVBUF for socket %u", GetFd()); - ; // do nothing. I'm a little sick of people trying to interpret this message as a result of why their incorrect setups don't work. + ServerInstance->Logs->Log("SOCKET", DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!"); + return I_ERR_CONNECT; } -} -bool BufferedSocket::DoBindMagic(const std::string ¤t_ip) -{ - irc::sockets::sockaddrs s; - if (!irc::sockets::aptosa(current_ip.c_str(), 0, &s)) + bind.sa.sa_family = 0; + if (!connectbindip.empty()) { - errno = EADDRNOTAVAIL; - return false; - } - - if (ServerInstance->SE->Bind(this->fd, &s.sa, sa_size(s)) < 0) - { - this->state = I_ERROR; - this->OnError(I_ERR_BIND); - return false; + if (!irc::sockets::aptosa(connectbindip.c_str(), 0, &bind)) + { + return I_ERR_BIND; + } } - return true; + return BeginConnect(addr, bind, maxtime); } -/* Most irc servers require you to specify the ip you want to bind to. - * If you dont specify an IP, they rather dumbly bind to the first IP - * of the box (e.g. INADDR_ANY). In InspIRCd, we scan thought the IP - * addresses we've bound server ports to, and we try and bind our outbound - * connections to the first usable non-loopback and non-any IP we find. - * This is easier to configure when you have a lot of links and a lot - * of servers to configure. - */ -bool BufferedSocket::BindAddr(const std::string &ip_to_bind) +static void IncreaseOSBuffers(int fd) { - ConfigReader Conf(this->ServerInstance); - - // Case one: If they provided an IP, try bind it - if (!ip_to_bind.empty()) - { - // And if it fails, don't do anything. - return this->DoBindMagic(ip_to_bind); - } - - for (int j = 0; j < Conf.Enumerate("bind"); j++) - { - // We only want to try bind to a server ip. - if (Conf.ReadValue("bind","type",j) != "servers") - continue; - - // set current IP to the tag - std::string current_ip = Conf.ReadValue("bind","address",j); - - // Make sure IP is nothing local - if (current_ip == "*" || current_ip == "127.0.0.1" || current_ip.empty() || current_ip == "::1") - continue; - - // Try bind, don't fail if it doesn't bind though. - if (this->DoBindMagic(current_ip)) - return true; - } - - // NOTE: You may wonder WTF we are returning *true* here, but that is because there were no custom binds setup, and so we have nothing to do - // (remember, outgoing connections without binding are perfectly ok). - ServerInstance->Logs->Log("SOCKET", DEBUG,"nothing in the config to bind()!"); - return true; + // attempt to increase socket sendq and recvq as high as its possible + int sendbuf = 32768; + int recvbuf = 32768; + setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char *)&sendbuf,sizeof(sendbuf)); + setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char *)&recvbuf,sizeof(recvbuf)); + // on failure, do nothing. I'm a little sick of people trying to interpret this message as a result of why their incorrect setups don't work. } -bool BufferedSocket::DoConnect(unsigned long maxtime) +BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned long timeout) { - irc::sockets::sockaddrs addr; - irc::sockets::aptosa(this->host, this->port, &addr); - - this->fd = socket(addr.sa.sa_family, SOCK_STREAM, 0); + if (fd < 0) + fd = socket(dest.sa.sa_family, SOCK_STREAM, 0); - if (this->fd == -1) - { - this->state = I_ERROR; - this->OnError(I_ERR_SOCKET); - return false; - } + if (fd < 0) + return I_ERR_SOCKET; - if (!this->BindAddr(this->cbindip)) + if (bind.sa.sa_family != 0) { - this->Close(); - this->fd = -1; - return false; + if (ServerInstance->SE->Bind(fd, &bind.sa, sa_size(bind)) < 0) + return I_ERR_BIND; } - ServerInstance->SE->NonBlocking(this->fd); + ServerInstance->SE->NonBlocking(fd); - if (ServerInstance->SE->Connect(this, &addr.sa, sa_size(addr)) == -1) + if (ServerInstance->SE->Connect(this, &dest.sa, sa_size(dest)) == -1) { if (errno != EINPROGRESS) - { - this->OnError(I_ERR_CONNECT); - this->Close(); - this->state = I_ERROR; - return false; - } - - this->Timeout = new SocketTimeout(this->GetFd(), this->ServerInstance, this, maxtime, this->ServerInstance->Time()); - this->ServerInstance->Timers->AddTimer(this->Timeout); + return I_ERR_CONNECT; } this->state = I_CONNECTING; - if (this->fd > -1) - { - if (!this->ServerInstance->SE->AddFd(this)) - { - this->OnError(I_ERR_NOMOREFDS); - this->Close(); - this->state = I_ERROR; - return false; - } - this->SetQueues(); - } + + if (!ServerInstance->SE->AddFd(this, true)) + return I_ERR_NOMOREFDS; + + this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time()); + ServerInstance->Timers->AddTimer(this->Timeout); + + IncreaseOSBuffers(fd); ServerInstance->Logs->Log("SOCKET", DEBUG,"BufferedSocket::DoConnect success"); - return true; + return I_ERR_NONE; } - -void BufferedSocket::Close() +void StreamSocket::Close() { /* Save this, so we dont lose it, * otherise on failure, error messages @@ -213,190 +118,196 @@ void BufferedSocket::Close() int save = errno; if (this->fd > -1) { - if (this->GetIOHook()) + if (IOHook) { try { - this->GetIOHook()->OnRawSocketClose(this->fd); + IOHook->OnStreamSocketClose(this); } catch (CoreException& modexcept) { - ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s", modexcept.GetSource(), modexcept.GetReason()); + ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s", + modexcept.GetSource(), modexcept.GetReason()); } } ServerInstance->SE->Shutdown(this, 2); - if (ServerInstance->SE->Close(this) != -1) - this->OnClose(); - - if (ServerInstance->SocketCull.find(this) == ServerInstance->SocketCull.end()) - ServerInstance->SocketCull[this] = this; + ServerInstance->SE->DelFd(this); + ServerInstance->SE->Close(this); + fd = -1; } errno = save; } -std::string BufferedSocket::GetIP() +void StreamSocket::cull() { - return this->IP; + Close(); } -const char* BufferedSocket::Read() +bool StreamSocket::GetNextLine(std::string& line, char delim) { - if (!ServerInstance->SE->BoundsCheckFd(this)) - return NULL; - - int n = 0; - char* ReadBuffer = ServerInstance->GetReadBuffer(); + std::string::size_type i = recvq.find(delim); + if (i == std::string::npos) + return false; + line = recvq.substr(0, i - 1); + // TODO is this the most efficient way to split? + recvq = recvq.substr(i + 1); + return true; +} - if (this->GetIOHook()) +void StreamSocket::DoRead() +{ + if (IOHook) { - int result2 = 0; - int MOD_RESULT = 0; + int rv = -1; try { - MOD_RESULT = this->GetIOHook()->OnRawSocketRead(this->fd, ReadBuffer, ServerInstance->Config->NetBufferSize, result2); + rv = IOHook->OnStreamSocketRead(this, recvq); } catch (CoreException& modexcept) { - ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s", modexcept.GetSource(), modexcept.GetReason()); + ServerInstance->Logs->Log("SOCKET", DEFAULT, "%s threw an exception: %s", + modexcept.GetSource(), modexcept.GetReason()); + return; } - if (MOD_RESULT < 0) + if (rv > 0) + OnDataReady(); + if (rv < 0) + SetError("Read Error"); // will not overwrite a better error message + } + else + { + char* ReadBuffer = ServerInstance->GetReadBuffer(); + int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0); + if (n > 0) { - n = -1; - errno = EAGAIN; + recvq.append(ReadBuffer, n); + OnDataReady(); } - else + else if (n == 0) { - n = result2; + error = "Connection closed"; + } + else if (errno != EAGAIN && errno != EINTR) + { + error = strerror(errno); } - } - else - { - n = recv(this->fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0); - } - - /* - * This used to do some silly bounds checking instead of just passing bufsize - 1 to recv. - * Not only does that make absolutely no sense, but it could potentially result in a read buffer's worth - * of data being thrown into the bit bucket for no good reason, which is just *stupid*.. do things correctly now. - * --w00t (july 2, 2008) - */ - if (n > 0) - { - ReadBuffer[n] = 0; - return ReadBuffer; - } - else - { - int err = errno; - if (err == EAGAIN) - return ""; - else - return NULL; } } -/* - * This function formerly tried to flush write buffer each call. - * While admirable in attempting to get the data out to wherever - * it is going, on a full socket, it's just going to syscall write() and - * EAGAIN constantly, instead of waiting in the SE to know if it can write - * which will chew a bit of CPU. - * - * So, now this function returns void (take note) and just adds to the sendq. - * - * It'll get written at a determinate point when the socketengine tells us it can write. - * -- w00t (april 1, 2008) - */ -void BufferedSocket::Write(const std::string &data) +void StreamSocket::DoWrite() { - /* Append the data to the back of the queue ready for writing */ - outbuffer.push_back(data); - - /* Mark ourselves as wanting write */ - this->ServerInstance->SE->WantWrite(this); -} + if (sendq.empty()) + return; -bool BufferedSocket::FlushWriteBuffer() -{ - errno = 0; - if ((this->fd > -1) && (this->state == I_CONNECTED)) + if (IOHook) { - if (this->GetIOHook()) + int rv = -1; + try { - while (outbuffer.size() && (errno != EAGAIN)) + while (!sendq.empty()) { - try + std::string& front = sendq.front(); + int itemlen = front.length(); + rv = IOHook->OnStreamSocketWrite(this, front); + if (rv > 0) + { + // consumed the entire string, and is ready for more + sendq_len -= itemlen; + sendq.pop_front(); + } + else if (rv == 0) { - /* XXX: The lack of buffering here is NOT a bug, modules implementing this interface have to - * implement their own buffering mechanisms - */ - this->GetIOHook()->OnRawSocketWrite(this->fd, outbuffer[0].c_str(), outbuffer[0].length()); - outbuffer.pop_front(); + // socket has blocked. Stop trying to send data. + // IOHook has requested unblock notification from the socketengine + + // Since it is possible that a partial write took place, adjust sendq_len + sendq_len = sendq_len - itemlen + front.length(); + return; } - catch (CoreException& modexcept) + else { - ServerInstance->Logs->Log("SOCKET", DEBUG,"%s threw an exception: %s", modexcept.GetSource(), modexcept.GetReason()); - return true; + SetError("Write Error"); // will not overwrite a better error message + return; } } } - else + catch (CoreException& modexcept) { - /* If we have multiple lines, try to send them all, - * not just the first one -- Brain - */ - while (outbuffer.size() && (errno != EAGAIN)) + ServerInstance->Logs->Log("SOCKET", DEBUG,"%s threw an exception: %s", + modexcept.GetSource(), modexcept.GetReason()); + } + } + else + { + // Prepare a writev() call to write all buffers efficiently + int bufcount = sendq.size(); + + // cap the number of buffers at IOV_MAX + if (bufcount > IOV_MAX) + bufcount = IOV_MAX; + + iovec* iovecs = new iovec[bufcount]; + for(int i=0; i < bufcount; i++) + { + iovecs[i].iov_base = const_cast(sendq[i].data()); + iovecs[i].iov_len = sendq[i].length(); + } + int rv = writev(fd, iovecs, bufcount); + delete[] iovecs; + if (rv == (int)sendq_len) + { + // it's our lucky day, everything got written out. Fast cleanup. + sendq_len = 0; + sendq.clear(); + } + else if (rv > 0) + { + // Partial write. Clean out strings from the sendq + sendq_len -= rv; + while (rv > 0 && !sendq.empty()) { - /* Send a line */ - int result = ServerInstance->SE->Send(this, outbuffer[0].c_str(), outbuffer[0].length(), 0); - - if (result > 0) + std::string& front = sendq.front(); + if (front.length() < (size_t)rv) { - if ((unsigned int)result >= outbuffer[0].length()) - { - /* The whole block was written (usually a line) - * Pop the block off the front of the queue, - * dont set errno, because we are clear of errors - * and want to try and write the next block too. - */ - outbuffer.pop_front(); - } - else - { - std::string temp = outbuffer[0].substr(result); - outbuffer[0] = temp; - /* We didnt get the whole line out. arses. - * Try again next time, i guess. Set errno, - * because we shouldnt be writing any more now, - * until the socketengine says its safe to do so. - */ - errno = EAGAIN; - } + // this string got fully written out + rv -= front.length(); + sendq.pop_front(); } - else if (result == 0) + else { - this->ServerInstance->SE->DelFd(this); - this->Close(); - return true; - } - else if ((result == -1) && (errno != EAGAIN)) - { - this->OnError(I_ERR_WRITE); - this->state = I_ERROR; - this->ServerInstance->SE->DelFd(this); - this->Close(); - return true; + // stopped in the middle of this string + front = front.substr(rv); + rv = 0; } } } + else if (rv == 0) + { + error = "Connection closed"; + } + else if (errno != EAGAIN && errno != EINTR) + { + error = strerror(errno); + } + if (sendq_len && error.empty()) + ServerInstance->SE->WantWrite(this); } +} + +void StreamSocket::WriteData(const std::string &data) +{ + bool newWrite = sendq.empty() && !data.empty(); - if ((errno == EAGAIN) && (fd > -1)) + /* Append the data to the back of the queue ready for writing */ + sendq.push_back(data); + sendq_len += data.length(); + + if (newWrite) { - this->ServerInstance->SE->WantWrite(this); + // TODO perhaps we should try writing first, before asking SE about writes? + // DoWrite(); + ServerInstance->SE->WantWrite(this); } - - return (fd < 0); } void SocketTimeout::Tick(time_t) @@ -421,57 +332,26 @@ void SocketTimeout::Tick(time_t) */ this->sock->state = I_ERROR; - if (ServerInstance->SocketCull.find(this->sock) == ServerInstance->SocketCull.end()) - ServerInstance->SocketCull[this->sock] = this->sock; + ServerInstance->GlobalCulls.AddItem(sock); } this->sock->Timeout = NULL; } -bool BufferedSocket::InternalMarkConnected() -{ - /* Our socket was in write-state, so delete it and re-add it - * in read-state. - */ - this->SetState(I_CONNECTED); +void BufferedSocket::OnConnected() { } +void BufferedSocket::OnTimeout() { return; } - if (this->GetIOHook()) +void BufferedSocket::DoWrite() +{ + if (state == I_CONNECTING) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"Hook for raw connect"); - try - { - this->GetIOHook()->OnRawSocketConnect(this->fd); - } - catch (CoreException& modexcept) - { - ServerInstance->Logs->Log("SOCKET",DEBUG,"%s threw an exception: %s", modexcept.GetSource(), modexcept.GetReason()); - return false; - } + state = I_CONNECTED; + this->OnConnected(); + if (GetIOHook()) + GetIOHook()->OnStreamSocketConnect(this); } - return this->OnConnected(); -} - -void BufferedSocket::SetState(BufferedSocketState s) -{ - this->state = s; -} - -BufferedSocketState BufferedSocket::GetState() -{ - return this->state; -} - -bool BufferedSocket::OnConnected() { return true; } -void BufferedSocket::OnError(BufferedSocketError) { return; } -int BufferedSocket::OnDisconnect() { return 0; } -bool BufferedSocket::OnDataReady() { return true; } -bool BufferedSocket::OnWriteReady() -{ - // Default behaviour: just try write some. - return !this->FlushWriteBuffer(); + this->StreamSocket::DoWrite(); } -void BufferedSocket::OnTimeout() { return; } -void BufferedSocket::OnClose() { return; } BufferedSocket::~BufferedSocket() { @@ -483,68 +363,49 @@ BufferedSocket::~BufferedSocket() } } -void BufferedSocket::HandleEvent(EventType et, int errornum) +void StreamSocket::HandleEvent(EventType et, int errornum) { + BufferedSocketError errcode = I_ERR_OTHER; switch (et) { case EVENT_ERROR: { + SetError(strerror(errornum)); switch (errornum) { case ETIMEDOUT: - this->OnError(I_ERR_TIMEOUT); + errcode = I_ERR_TIMEOUT; break; case ECONNREFUSED: case 0: - this->OnError(this->state == I_CONNECTING ? I_ERR_CONNECT : I_ERR_WRITE); + errcode = I_ERR_CONNECT; break; case EADDRINUSE: - this->OnError(I_ERR_BIND); + errcode = I_ERR_BIND; break; case EPIPE: case EIO: - this->OnError(I_ERR_WRITE); + errcode = I_ERR_WRITE; break; } - - if (this->ServerInstance->SocketCull.find(this) == this->ServerInstance->SocketCull.end()) - this->ServerInstance->SocketCull[this] = this; - return; break; } case EVENT_READ: { - if (!this->OnDataReady()) - { - if (this->ServerInstance->SocketCull.find(this) == this->ServerInstance->SocketCull.end()) - this->ServerInstance->SocketCull[this] = this; - return; - } + DoRead(); break; } case EVENT_WRITE: { - if (this->state == I_CONNECTING) - { - if (!this->InternalMarkConnected()) - { - if (this->ServerInstance->SocketCull.find(this) == this->ServerInstance->SocketCull.end()) - this->ServerInstance->SocketCull[this] = this; - return; - } - return; - } - else - { - if (!this->OnWriteReady()) - { - if (this->ServerInstance->SocketCull.find(this) == this->ServerInstance->SocketCull.end()) - this->ServerInstance->SocketCull[this] = this; - return; - } - } + DoWrite(); break; } } + if (!error.empty()) + { + ServerInstance->Logs->Log("SOCKET", DEBUG, "Error on FD %d - '%s'", fd, error.c_str()); + OnError(errcode); + ServerInstance->GlobalCulls.AddItem(this); + } } -- cgit v1.2.3