X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=src%2Finspsocket.cpp;h=9acd484fdef8c0892bdfd9592c3524ea48b23999;hb=934d9a6a184b7a8600fcda30e012ba6f29f17b64;hp=907acea67f78c35c62a34f167f975fee86a58ac7;hpb=9f2e1f901930d3646db5f3c21180f02f2f9ce41f;p=user%2Fhenk%2Fcode%2Finspircd.git diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp index 907acea67..9acd484fd 100644 --- a/src/inspsocket.cpp +++ b/src/inspsocket.cpp @@ -28,7 +28,7 @@ BufferedSocket::BufferedSocket(int newfd) this->fd = newfd; this->state = I_CONNECTED; if (fd > -1) - ServerInstance->SE->AddFd(this); + ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE); } void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip) @@ -97,7 +97,7 @@ BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& this->state = I_CONNECTING; - if (!ServerInstance->SE->AddFd(this, true)) + if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE)) return I_ERR_NOMOREFDS; this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time()); @@ -178,18 +178,35 @@ void StreamSocket::DoRead() { char* ReadBuffer = ServerInstance->GetReadBuffer(); int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0); - if (n > 0) + if (n == ServerInstance->Config->NetBufferSize) { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); + recvq.append(ReadBuffer, n); + OnDataReady(); + } + else if (n > 0) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ); recvq.append(ReadBuffer, n); OnDataReady(); } else if (n == 0) { error = "Connection closed"; + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); + } + else if (errno == EAGAIN) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK); } - else if (errno != EAGAIN && errno != EINTR) + else if (errno == EINTR) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); + } + else { error = strerror(errno); + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); } } } @@ -198,6 +215,11 @@ void StreamSocket::DoWrite() { if (sendq.empty()) return; + if (!error.empty() || fd < 0 || fd == INT_MAX) + { + ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket"); + return; + } if (IOHook) { @@ -255,58 +277,92 @@ void StreamSocket::DoWrite() } else { - // Prepare a writev() call to write all buffers efficiently - int bufcount = sendq.size(); + // don't even try if we are known to be blocking + if (GetEventMask() & FD_WRITE_WILL_BLOCK) + return; + // start out optimistic - we won't need to write any more + int eventChange = FD_WANT_EDGE_WRITE; + while (sendq_len && eventChange == FD_WANT_EDGE_WRITE) + { + // Prepare a writev() call to write all buffers efficiently + int bufcount = sendq.size(); - // cap the number of buffers at IOV_MAX - if (bufcount > IOV_MAX) - bufcount = IOV_MAX; + // cap the number of buffers at IOV_MAX + if (bufcount > IOV_MAX) + { + bufcount = IOV_MAX; + } - iovec* iovecs = new iovec[bufcount]; - for(int i=0; i < bufcount; i++) - { - iovecs[i].iov_base = const_cast(sendq[i].data()); - iovecs[i].iov_len = sendq[i].length(); - } - int rv = writev(fd, iovecs, bufcount); - delete[] iovecs; - if (rv == (int)sendq_len) - { - // it's our lucky day, everything got written out. Fast cleanup. - sendq_len = 0; - sendq.clear(); - } - else if (rv > 0) - { - // Partial write. Clean out strings from the sendq - sendq_len -= rv; - while (rv > 0 && !sendq.empty()) + int rv_max = 0; + iovec* iovecs = new iovec[bufcount]; + for(int i=0; i < bufcount; i++) { - std::string& front = sendq.front(); - if (front.length() < (size_t)rv) + iovecs[i].iov_base = const_cast(sendq[i].data()); + iovecs[i].iov_len = sendq[i].length(); + rv_max += sendq[i].length(); + } + int rv = writev(fd, iovecs, bufcount); + delete[] iovecs; + + if (rv == (int)sendq_len) + { + // it's our lucky day, everything got written out. Fast cleanup. + // This won't ever happen if the number of buffers got capped. + sendq_len = 0; + sendq.clear(); + } + else if (rv > 0) + { + // Partial write. Clean out strings from the sendq + sendq_len -= rv; + while (rv > 0 && !sendq.empty()) { - // this string got fully written out - rv -= front.length(); - sendq.pop_front(); + std::string& front = sendq.front(); + if (front.length() <= (size_t)rv) + { + // this string got fully written out + rv -= front.length(); + sendq.pop_front(); + } + else + { + // stopped in the middle of this string + front = front.substr(rv); + rv = 0; + } } - else + if (rv < rv_max) { - // stopped in the middle of this string - front = front.substr(rv); - rv = 0; + // it's going to block now + eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; } } + else if (rv == 0) + { + error = "Connection closed"; + } + else if (errno == EAGAIN) + { + eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; + } + else if (errno == EINTR) + { + // restart interrupted syscall + } + else + { + error = strerror(errno); + } } - else if (rv == 0) + if (!error.empty()) { - error = "Connection closed"; + // error - kill all events + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); } - else if (errno != EAGAIN && errno != EINTR) + else { - error = strerror(errno); + ServerInstance->SE->ChangeEventMask(this, eventChange); } - if (sendq_len && error.empty()) - ServerInstance->SE->WantWrite(this); } } @@ -318,18 +374,12 @@ void StreamSocket::WriteData(const std::string &data) data.c_str()); return; } - bool newWrite = sendq.empty() && !data.empty(); /* Append the data to the back of the queue ready for writing */ sendq.push_back(data); sendq_len += data.length(); - if (newWrite) - { - // TODO perhaps we should try writing first, before asking SE about writes? - // DoWrite(); - ServerInstance->SE->WantWrite(this); - } + ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE); } void SocketTimeout::Tick(time_t) @@ -371,6 +421,8 @@ void BufferedSocket::DoWrite() this->OnConnected(); if (GetIOHook()) GetIOHook()->OnStreamSocketConnect(this); + else + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE); } this->StreamSocket::DoWrite(); } @@ -392,7 +444,10 @@ void StreamSocket::HandleEvent(EventType et, int errornum) { case EVENT_ERROR: { - SetError(strerror(errornum)); + if (errornum == 0) + SetError("Connection closed"); + else + SetError(strerror(errornum)); switch (errornum) { case ETIMEDOUT: