diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dns.cpp | 2 | ||||
-rw-r--r-- | src/inspircd.cpp | 1 | ||||
-rw-r--r-- | src/inspsocket.cpp | 147 | ||||
-rw-r--r-- | src/listensocket.cpp | 2 | ||||
-rw-r--r-- | src/modules/extra/m_ssl_gnutls.cpp | 17 | ||||
-rw-r--r-- | src/modules/extra/m_ssl_openssl.cpp | 254 | ||||
-rw-r--r-- | src/modules/m_httpd_stats.cpp | 2 | ||||
-rw-r--r-- | src/modules/m_ident.cpp | 8 | ||||
-rw-r--r-- | src/socketengine.cpp | 64 | ||||
-rw-r--r-- | src/socketengines/socketengine_epoll.cpp | 128 | ||||
-rw-r--r-- | src/socketengines/socketengine_iocp.cpp | 12 | ||||
-rw-r--r-- | src/socketengines/socketengine_kqueue.cpp | 96 | ||||
-rw-r--r-- | src/socketengines/socketengine_poll.cpp | 126 | ||||
-rw-r--r-- | src/socketengines/socketengine_ports.cpp | 85 | ||||
-rw-r--r-- | src/socketengines/socketengine_select.cpp | 77 | ||||
-rw-r--r-- | src/usermanager.cpp | 2 |
16 files changed, 475 insertions, 548 deletions
diff --git a/src/dns.cpp b/src/dns.cpp index 94a01e64c..3356700ef 100644 --- a/src/dns.cpp +++ b/src/dns.cpp @@ -369,7 +369,7 @@ void DNS::Rehash() /* Hook the descriptor into the socket engine */ if (ServerInstance && ServerInstance->SE) { - if (!ServerInstance->SE->AddFd(this)) + if (!ServerInstance->SE->AddFd(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE)) { ServerInstance->Logs->Log("RESOLVER",DEFAULT,"Internal error starting DNS - hostnames will NOT resolve."); ServerInstance->SE->Shutdown(this, 2); diff --git a/src/inspircd.cpp b/src/inspircd.cpp index f28324d16..0fc3535e2 100644 --- a/src/inspircd.cpp +++ b/src/inspircd.cpp @@ -823,6 +823,7 @@ int InspIRCd::Run() * This will cause any read or write events to be * dispatched to their handlers. */ + this->SE->DispatchTrialWrites(); this->SE->DispatchEvents(); /* if any users were quit, take them out */ diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp index 907acea67..6348d7982 100644 --- a/src/inspsocket.cpp +++ b/src/inspsocket.cpp @@ -28,7 +28,7 @@ BufferedSocket::BufferedSocket(int newfd) this->fd = newfd; this->state = I_CONNECTED; if (fd > -1) - ServerInstance->SE->AddFd(this); + ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE); } void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip) @@ -97,7 +97,7 @@ BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& this->state = I_CONNECTING; - if (!ServerInstance->SE->AddFd(this, true)) + if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_POLL_WRITE)) return I_ERR_NOMOREFDS; this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time()); @@ -178,18 +178,35 @@ void StreamSocket::DoRead() { char* ReadBuffer = ServerInstance->GetReadBuffer(); int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0); - if (n > 0) + if (n == ServerInstance->Config->NetBufferSize) { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); + recvq.append(ReadBuffer, n); + OnDataReady(); + } + else if (n > 0) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ); recvq.append(ReadBuffer, n); OnDataReady(); } else if (n == 0) { error = "Connection closed"; + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); + } + else if (errno == EAGAIN) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK); } - else if (errno != EAGAIN && errno != EINTR) + else if (errno == EINTR) + { + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); + } + else { error = strerror(errno); + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); } } } @@ -255,58 +272,90 @@ void StreamSocket::DoWrite() } else { - // Prepare a writev() call to write all buffers efficiently - int bufcount = sendq.size(); + bool again = true; + while (again) + { + again = false; + + // Prepare a writev() call to write all buffers efficiently + int bufcount = sendq.size(); - // cap the number of buffers at IOV_MAX - if (bufcount > IOV_MAX) - bufcount = IOV_MAX; + // cap the number of buffers at IOV_MAX + if (bufcount > IOV_MAX) + { + bufcount = IOV_MAX; + again = true; + } - iovec* iovecs = new iovec[bufcount]; - for(int i=0; i < bufcount; i++) - { - iovecs[i].iov_base = const_cast<char*>(sendq[i].data()); - iovecs[i].iov_len = sendq[i].length(); - } - int rv = writev(fd, iovecs, bufcount); - delete[] iovecs; - if (rv == (int)sendq_len) - { - // it's our lucky day, everything got written out. Fast cleanup. - sendq_len = 0; - sendq.clear(); - } - else if (rv > 0) - { - // Partial write. Clean out strings from the sendq - sendq_len -= rv; - while (rv > 0 && !sendq.empty()) + iovec* iovecs = new iovec[bufcount]; + for(int i=0; i < bufcount; i++) { - std::string& front = sendq.front(); - if (front.length() < (size_t)rv) - { - // this string got fully written out - rv -= front.length(); - sendq.pop_front(); - } - else + iovecs[i].iov_base = const_cast<char*>(sendq[i].data()); + iovecs[i].iov_len = sendq[i].length(); + } + int rv = writev(fd, iovecs, bufcount); + delete[] iovecs; + + if (rv == (int)sendq_len) + { + // it's our lucky day, everything got written out. Fast cleanup. + // This won't ever happen if the number of buffers got capped. + sendq_len = 0; + sendq.clear(); + } + else if (rv > 0) + { + // Partial write. Clean out strings from the sendq + sendq_len -= rv; + while (rv > 0 && !sendq.empty()) { - // stopped in the middle of this string - front = front.substr(rv); - rv = 0; + std::string& front = sendq.front(); + if (front.length() < (size_t)rv) + { + // this string got fully written out + rv -= front.length(); + sendq.pop_front(); + } + else + { + // stopped in the middle of this string + front = front.substr(rv); + rv = 0; + } } } + else if (rv == 0) + { + error = "Connection closed"; + } + else if (errno == EAGAIN) + { + again = false; + } + else if (errno == EINTR) + { + again = true; + } + else + { + error = strerror(errno); + } } - else if (rv == 0) + if (!error.empty()) { - error = "Connection closed"; + // error - kill all events + ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); } - else if (errno != EAGAIN && errno != EINTR) + else if (sendq_len) { - error = strerror(errno); + // writes have blocked, we can use FAST_WRITE to find when they unblock + ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK); + } + else + { + // writes are done, we can use EDGE_WRITE to stop asking for write + ServerInstance->SE->ChangeEventMask(this, FD_WANT_EDGE_WRITE); } - if (sendq_len && error.empty()) - ServerInstance->SE->WantWrite(this); } } @@ -318,18 +367,12 @@ void StreamSocket::WriteData(const std::string &data) data.c_str()); return; } - bool newWrite = sendq.empty() && !data.empty(); /* Append the data to the back of the queue ready for writing */ sendq.push_back(data); sendq_len += data.length(); - if (newWrite) - { - // TODO perhaps we should try writing first, before asking SE about writes? - // DoWrite(); - ServerInstance->SE->WantWrite(this); - } + ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE); } void SocketTimeout::Tick(time_t) diff --git a/src/listensocket.cpp b/src/listensocket.cpp index 823cb9eca..dfd2f11f7 100644 --- a/src/listensocket.cpp +++ b/src/listensocket.cpp @@ -57,7 +57,7 @@ ListenSocketBase::ListenSocketBase(InspIRCd* Instance, int port, const std::stri else { Instance->SE->NonBlocking(this->fd); - Instance->SE->AddFd(this); + Instance->SE->AddFd(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); } } } diff --git a/src/modules/extra/m_ssl_gnutls.cpp b/src/modules/extra/m_ssl_gnutls.cpp index a7175005f..e72666062 100644 --- a/src/modules/extra/m_ssl_gnutls.cpp +++ b/src/modules/extra/m_ssl_gnutls.cpp @@ -439,7 +439,7 @@ class ModuleSSLGnuTLS : public Module } else if (session->status == ISSL_HANDSHAKING_WRITE) { - MakePollWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_NO_READ | FD_WANT_POLL_WRITE); return 0; } @@ -515,17 +515,18 @@ class ModuleSSLGnuTLS : public Module if (ret == (int)sendq.length()) { + ServerInstance->SE->ChangeEventMask(user, FD_WANT_NO_WRITE); return 1; } else if (ret > 0) { sendq = sendq.substr(ret); - MakePollWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_WRITE); return 0; } else if (ret == GNUTLS_E_AGAIN || ret == GNUTLS_E_INTERRUPTED) { - MakePollWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_WRITE); return 0; } else if (ret == 0) @@ -559,12 +560,13 @@ class ModuleSSLGnuTLS : public Module { // gnutls_handshake() wants to read() again. session->status = ISSL_HANDSHAKING_READ; + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); } else { // gnutls_handshake() wants to write() again. session->status = ISSL_HANDSHAKING_WRITE; - MakePollWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_NO_READ | FD_WANT_POLL_WRITE); } } else @@ -583,7 +585,7 @@ class ModuleSSLGnuTLS : public Module VerifyCertificate(session,user); // Finish writing, if any left - MakePollWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE | FD_ADD_TRIAL_WRITE); return true; } @@ -605,11 +607,6 @@ class ModuleSSLGnuTLS : public Module } } - void MakePollWrite(EventHandler* eh) - { - ServerInstance->SE->WantWrite(eh); - } - void CloseSession(issl_session* session) { if(session->sess) diff --git a/src/modules/extra/m_ssl_openssl.cpp b/src/modules/extra/m_ssl_openssl.cpp index 8a2737f0e..e72bdc816 100644 --- a/src/modules/extra/m_ssl_openssl.cpp +++ b/src/modules/extra/m_ssl_openssl.cpp @@ -36,7 +36,6 @@ enum issl_status { ISSL_NONE, ISSL_HANDSHAKING, ISSL_OPEN }; -enum issl_io_status { ISSL_WRITE, ISSL_READ }; static bool SelfSigned = false; @@ -54,20 +53,15 @@ class issl_session : public classbase public: SSL* sess; issl_status status; - issl_io_status rstat; - issl_io_status wstat; - unsigned int inbufoffset; - char* inbuf; // Buffer OpenSSL reads into. - std::string outbuf; int fd; bool outbound; + bool data_to_write; issl_session() { outbound = false; - rstat = ISSL_READ; - wstat = ISSL_WRITE; + data_to_write = false; } }; @@ -106,10 +100,7 @@ class ModuleSSLOpenSSL : public Module public: - InspIRCd* PublicInstance; - ModuleSSLOpenSSL(InspIRCd* Me) - : Module(Me), PublicInstance(Me) { ServerInstance->Modules->PublishInterface("BufferedSocketHook", this); @@ -137,7 +128,7 @@ class ModuleSSLOpenSSL : public Module // Needs the flag as it ignores a plain /rehash OnModuleRehash(NULL,"ssl"); Implementation eventlist[] = { - I_On005Numeric, I_OnBufferFlushed, I_OnRequest, I_OnRehash, I_OnModuleRehash, I_OnPostConnect, + I_On005Numeric, I_OnRequest, I_OnRehash, I_OnModuleRehash, I_OnPostConnect, I_OnHookIO }; ServerInstance->Modules->Attach(eventlist, this, sizeof(eventlist)/sizeof(Implementation)); } @@ -350,8 +341,6 @@ class ModuleSSLOpenSSL : public Module issl_session* session = &sessions[fd]; session->fd = fd; - session->inbuf = new char[inbufsize]; - session->inbufoffset = 0; session->sess = SSL_new(ctx); session->status = ISSL_NONE; session->outbound = false; @@ -378,8 +367,6 @@ class ModuleSSLOpenSSL : public Module issl_session* session = &sessions[fd]; session->fd = fd; - session->inbuf = new char[inbufsize]; - session->inbufoffset = 0; session->sess = SSL_new(clictx); session->status = ISSL_NONE; session->outbound = true; @@ -423,19 +410,12 @@ class ModuleSSLOpenSSL : public Module if (session->status == ISSL_HANDSHAKING) { - if (session->rstat == ISSL_READ || session->wstat == ISSL_READ) - { - // The handshake isn't finished and it wants to read, try to finish it. - if (!Handshake(user, session)) - { - // Couldn't resume handshake. - if (session->status == ISSL_NONE) - return -1; - return 0; - } - } - else + // The handshake isn't finished and it wants to read, try to finish it. + if (!Handshake(user, session)) { + // Couldn't resume handshake. + if (session->status == ISSL_NONE) + return -1; return 0; } } @@ -444,26 +424,40 @@ class ModuleSSLOpenSSL : public Module if (session->status == ISSL_OPEN) { - if (session->wstat == ISSL_READ) + char* buffer = ServerInstance->GetReadBuffer(); + size_t bufsiz = ServerInstance->Config->NetBufferSize; + int ret = SSL_read(session->sess, buffer, bufsiz); + + if (ret > 0) { - if(DoWrite(user, session) == 0) - return 0; + recvq.append(buffer, ret); + return 1; } - - if (session->rstat == ISSL_READ) + else if (ret == 0) + { + // Client closed connection. + CloseSession(session); + return -1; + } + else if (ret < 0) { - int ret = DoRead(user, session); + int err = SSL_get_error(session->sess, ret); - if (ret > 0) + if (err == SSL_ERROR_WANT_READ) { - recvq.append(session->inbuf, session->inbufoffset); - session->inbufoffset = 0; - return 1; + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ); + return 0; } - else if (errno == EAGAIN || errno == EINTR) + else if (err == SSL_ERROR_WANT_WRITE) + { + ServerInstance->SE->ChangeEventMask(user, FD_WANT_NO_READ | FD_WANT_POLL_WRITE); return 0; + } else + { + CloseSession(session); return -1; + } } } @@ -473,9 +467,6 @@ class ModuleSSLOpenSSL : public Module int OnStreamSocketWrite(StreamSocket* user, std::string& buffer) { int fd = user->GetFd(); - /* Are there any possibilities of an out of range fd? Hope not, but lets be paranoid */ - if ((fd < 0) || (fd > ServerInstance->SE->GetMaxFds() - 1)) - return -1; issl_session* session = &sessions[fd]; @@ -485,136 +476,61 @@ class ModuleSSLOpenSSL : public Module return -1; } + session->data_to_write = true; + if (session->status == ISSL_HANDSHAKING) { - // The handshake isn't finished, try to finish it. - if (session->rstat == ISSL_WRITE || session->wstat == ISSL_WRITE) + if (!Handshake(user, session)) { - if (!Handshake(user, session)) - { - // Couldn't resume handshake. - if (session->status == ISSL_NONE) - return -1; - return 0; - } + // Couldn't resume handshake. + if (session->status == ISSL_NONE) + return -1; + return 0; } } - int rv = 0; - - // don't pull items into the output buffer until they are - // unlikely to block; this allows sendq exceeded to continue - // to work for SSL users. - // TODO better signaling for I/O requests so this isn't needed - if (session->outbuf.empty()) - { - session->outbuf = buffer; - rv = 1; - } - if (session->status == ISSL_OPEN) { - if (session->rstat == ISSL_WRITE) + int ret = SSL_write(session->sess, buffer.data(), buffer.size()); + if (ret == (int)buffer.length()) { - DoRead(user, session); + session->data_to_write = false; + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); + return 1; } - - if (session->wstat == ISSL_WRITE) + else if (ret > 0) { - DoWrite(user, session); - } - } - - if (rv == 0 || !session->outbuf.empty()) - ServerInstance->SE->WantWrite(user); - - return rv; - } - - int DoWrite(StreamSocket* user, issl_session* session) - { - if (!session->outbuf.size()) - return -1; - - int ret = SSL_write(session->sess, session->outbuf.data(), session->outbuf.size()); - - if (ret == 0) - { - CloseSession(session); - return 0; - } - else if (ret < 0) - { - int err = SSL_get_error(session->sess, ret); - - if (err == SSL_ERROR_WANT_WRITE) - { - session->wstat = ISSL_WRITE; - ServerInstance->SE->WantWrite(user); - return -1; - } - else if (err == SSL_ERROR_WANT_READ) - { - session->wstat = ISSL_READ; - return -1; - } - else - { - CloseSession(session); + buffer = buffer.substr(ret); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_WRITE); return 0; } - } - else - { - session->outbuf = session->outbuf.substr(ret); - return ret; - } - } - - int DoRead(StreamSocket* user, issl_session* session) - { - // Is this right? Not sure if the unencrypted data is garaunteed to be the same length. - // Read into the inbuffer, offset from the beginning by the amount of data we have that insp hasn't taken yet. - - int ret = SSL_read(session->sess, session->inbuf + session->inbufoffset, inbufsize - session->inbufoffset); - - if (ret == 0) - { - // Client closed connection. - CloseSession(session); - return 0; - } - else if (ret < 0) - { - int err = SSL_get_error(session->sess, ret); - - if (err == SSL_ERROR_WANT_READ) + else if (ret == 0) { - session->rstat = ISSL_READ; - return -1; - } - else if (err == SSL_ERROR_WANT_WRITE) - { - session->rstat = ISSL_WRITE; - ServerInstance->SE->WantWrite(user); + CloseSession(session); return -1; } - else + else if (ret < 0) { - CloseSession(session); - return 0; - } - } - else - { - // Read successfully 'ret' bytes into inbuf + inbufoffset - // There are 'ret' + 'inbufoffset' bytes of data in 'inbuf' - // 'buffer' is 'count' long - - session->inbufoffset += ret; + int err = SSL_get_error(session->sess, ret); - return ret; + if (err == SSL_ERROR_WANT_WRITE) + { + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_WRITE); + return 0; + } + else if (err == SSL_ERROR_WANT_READ) + { + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); + return 0; + } + else + { + CloseSession(session); + return -1; + } + } } + return 0; } bool Handshake(EventHandler* user, issl_session* session) @@ -632,15 +548,14 @@ class ModuleSSLOpenSSL : public Module if (err == SSL_ERROR_WANT_READ) { - session->rstat = ISSL_READ; + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); session->status = ISSL_HANDSHAKING; return true; } else if (err == SSL_ERROR_WANT_WRITE) { - session->wstat = ISSL_WRITE; + ServerInstance->SE->ChangeEventMask(user, FD_WANT_NO_READ | FD_WANT_POLL_WRITE); session->status = ISSL_HANDSHAKING; - ServerInstance->SE->WantWrite(user); return true; } else @@ -653,13 +568,11 @@ class ModuleSSLOpenSSL : public Module else if (ret > 0) { // Handshake complete. - // This will do for setting the ssl flag...it could be done earlier if it's needed. But this seems neater. - EventHandler *u = ServerInstance->SE->GetRef(session->fd); - VerifyCertificate(session, u); + VerifyCertificate(session, user); session->status = ISSL_OPEN; - ServerInstance->SE->WantWrite(user); + ServerInstance->SE->ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE | FD_ADD_TRIAL_WRITE); return true; } @@ -672,17 +585,6 @@ class ModuleSSLOpenSSL : public Module return true; } - void OnBufferFlushed(User* user) - { - if (user->GetIOHook() == this) - { - std::string dummy; - issl_session* session = &sessions[user->GetFd()]; - if (session && session->outbuf.size()) - OnStreamSocketWrite(user, dummy); - } - } - void CloseSession(issl_session* session) { if (session->sess) @@ -691,13 +593,6 @@ class ModuleSSLOpenSSL : public Module SSL_free(session->sess); } - if (session->inbuf) - { - delete[] session->inbuf; - } - - session->outbuf.clear(); - session->inbuf = NULL; session->sess = NULL; session->status = ISSL_NONE; errno = EIO; @@ -771,8 +666,7 @@ class ModuleSSLOpenSSL : public Module static int error_callback(const char *str, size_t len, void *u) { - ModuleSSLOpenSSL* mssl = (ModuleSSLOpenSSL*)u; - mssl->PublicInstance->Logs->Log("m_ssl_openssl",DEFAULT, "SSL error: " + std::string(str, len - 1)); + ServerInstance->Logs->Log("m_ssl_openssl",DEFAULT, "SSL error: " + std::string(str, len - 1)); // // XXX: Remove this line, it causes valgrind warnings... diff --git a/src/modules/m_httpd_stats.cpp b/src/modules/m_httpd_stats.cpp index 884793e1c..01d77d806 100644 --- a/src/modules/m_httpd_stats.cpp +++ b/src/modules/m_httpd_stats.cpp @@ -87,7 +87,7 @@ class ModuleHttpStats : public Module data << "<usercount>" << ServerInstance->Users->clientlist->size() << "</usercount>"; data << "<channelcount>" << ServerInstance->chanlist->size() << "</channelcount>"; data << "<opercount>" << ServerInstance->Users->all_opers.size() << "</opercount>"; - data << "<socketcount>" << (ServerInstance->SE->GetMaxFds() - ServerInstance->SE->GetRemainingFds()) << "</socketcount><socketmax>" << ServerInstance->SE->GetMaxFds() << "</socketmax><socketengine>" << ServerInstance->SE->GetName() << "</socketengine>"; + data << "<socketcount>" << (ServerInstance->SE->GetUsedFds()) << "</socketcount><socketmax>" << ServerInstance->SE->GetMaxFds() << "</socketmax><socketengine>" << ServerInstance->SE->GetName() << "</socketengine>"; time_t current_time = 0; current_time = ServerInstance->Time(); diff --git a/src/modules/m_ident.cpp b/src/modules/m_ident.cpp index 973c93f72..66bd8835a 100644 --- a/src/modules/m_ident.cpp +++ b/src/modules/m_ident.cpp @@ -127,21 +127,17 @@ class IdentRequestSocket : public EventHandler } /* Add fd to socket engine */ - if (!ServerInstance->SE->AddFd(this)) + if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_POLL_WRITE)) { this->Close(); throw ModuleException("out of fds"); } - - /* Important: We set WantWrite immediately after connect() - * because a successful connection will trigger a writability event - */ - ServerInstance->SE->WantWrite(this); } virtual void OnConnected() { ServerInstance->Logs->Log("m_ident",DEBUG,"OnConnected()"); + ServerInstance->SE->ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE); char req[32]; diff --git a/src/socketengine.cpp b/src/socketengine.cpp index c0ae3f278..49624481f 100644 --- a/src/socketengine.cpp +++ b/src/socketengine.cpp @@ -62,40 +62,74 @@ SocketEngine::~SocketEngine() { } -bool SocketEngine::HasFd(int fd) +void SocketEngine::SetEventMask(EventHandler* eh, int mask) { - if ((fd < 0) || (fd > MAX_DESCRIPTORS)) - return false; - return ref[fd]; + eh->event_mask = mask; } -EventHandler* SocketEngine::GetRef(int fd) +void SocketEngine::ChangeEventMask(EventHandler* eh, int change) { - if ((fd < 0) || (fd > MAX_DESCRIPTORS)) - return 0; - return ref[fd]; + int old_m = eh->event_mask; + int new_m = old_m; + + // if we are changing read/write type, remove the previously set bit + if (change & FD_WANT_READ_MASK) + new_m &= ~FD_WANT_READ_MASK; + if (change & FD_WANT_WRITE_MASK) + new_m &= ~FD_WANT_WRITE_MASK; + + // if adding a trial read/write, insert it into the set + if (change & FD_TRIAL_NOTE_MASK && !(old_m & FD_TRIAL_NOTE_MASK)) + trials.insert(eh->GetFd()); + + new_m |= change; + if (new_m == old_m) + return; + + eh->event_mask = new_m; + OnSetEvent(eh, old_m, new_m); } -int SocketEngine::GetMaxFds() +void SocketEngine::DispatchTrialWrites() { - return 0; + std::vector<int> working_list; + working_list.reserve(trials.size()); + working_list.assign(trials.begin(), trials.end()); + trials.clear(); + for(unsigned int i=0; i < working_list.size(); i++) + { + int fd = working_list[i]; + EventHandler* eh = GetRef(fd); + if (!eh) + continue; + int mask = eh->event_mask; + eh->event_mask &= ~(FD_ADD_TRIAL_READ | FD_ADD_TRIAL_WRITE); + if ((mask & (FD_ADD_TRIAL_READ | FD_READ_WILL_BLOCK)) == FD_ADD_TRIAL_READ) + eh->HandleEvent(EVENT_READ, 0); + if ((mask & (FD_ADD_TRIAL_WRITE | FD_WRITE_WILL_BLOCK)) == FD_ADD_TRIAL_WRITE) + eh->HandleEvent(EVENT_WRITE, 0); + } } -int SocketEngine::GetRemainingFds() +bool SocketEngine::HasFd(int fd) { - return 0; + if ((fd < 0) || (fd > GetMaxFds())) + return false; + return ref[fd]; } -int SocketEngine::DispatchEvents() +EventHandler* SocketEngine::GetRef(int fd) { - return 0; + if ((fd < 0) || (fd > GetMaxFds())) + return 0; + return ref[fd]; } bool SocketEngine::BoundsCheckFd(EventHandler* eh) { if (!eh) return false; - if ((eh->GetFd() < 0) || (eh->GetFd() > MAX_DESCRIPTORS)) + if ((eh->GetFd() < 0) || (eh->GetFd() > GetMaxFds())) return false; return true; } diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp index 7fed6f250..672ff4a7b 100644 --- a/src/socketengines/socketengine_epoll.cpp +++ b/src/socketengines/socketengine_epoll.cpp @@ -18,7 +18,18 @@ EPollEngine::EPollEngine() { - MAX_DESCRIPTORS = 0; + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); + printf("ERROR: Can't determine maximum number of open sockets!\n"); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } + // This is not a maximum, just a hint at the eventual number of sockets that may be polled. EngineHandle = epoll_create(GetMaxFds() / 4); @@ -26,11 +37,10 @@ EPollEngine::EPollEngine() { ServerInstance->Logs->Log("SOCKET",DEFAULT, "ERROR: Could not initialize socket engine: %s", strerror(errno)); ServerInstance->Logs->Log("SOCKET",DEFAULT, "ERROR: Your kernel probably does not have the proper features. This is a fatal error, exiting now."); - printf("ERROR: Could not initialize socket engine: %s\n", strerror(errno)); + printf("ERROR: Could not initialize epoll socket engine: %s\n", strerror(errno)); printf("ERROR: Your kernel probably does not have the proper features. This is a fatal error, exiting now.\n"); ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); } - CurrentSetSize = 0; ref = new EventHandler* [GetMaxFds()]; events = new struct epoll_event[GetMaxFds()]; @@ -45,18 +55,35 @@ EPollEngine::~EPollEngine() delete[] events; } -bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_epoll(int event_mask) { - int fd = eh->GetFd(); - if ((fd < 0) || (fd > GetMaxFds() - 1)) + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_POLL_WRITE)) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"AddFd out of range: (fd: %d, max: %d)", fd, GetMaxFds()); - return false; + // we need to use standard polling on this FD + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= EPOLLIN; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= EPOLLOUT; } + else + { + // we can use edge-triggered polling on this FD + rv = EPOLLET; + if (event_mask & (FD_WANT_FAST_READ | FD_WANT_EDGE_READ)) + rv |= EPOLLIN; + if (event_mask & (FD_WANT_FAST_WRITE | FD_WANT_EDGE_WRITE)) + rv |= EPOLLOUT; + } + return rv; +} - if (GetRemainingFds() <= 1) +bool EPollEngine::AddFd(EventHandler* eh, int event_mask) +{ + int fd = eh->GetFd(); + if ((fd < 0) || (fd > GetMaxFds() - 1)) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"No remaining FDs cannot add fd: %d", fd); + ServerInstance->Logs->Log("SOCKET",DEBUG,"AddFd out of range: (fd: %d, max: %d)", fd, GetMaxFds()); return false; } @@ -68,7 +95,7 @@ bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) struct epoll_event ev; memset(&ev,0,sizeof(ev)); - ev.events = writeFirst ? EPOLLOUT : EPOLLIN; + ev.events = mask_to_epoll(event_mask); ev.data.fd = fd; int i = epoll_ctl(EngineHandle, EPOLL_CTL_ADD, fd, &ev); if (i < 0) @@ -80,20 +107,24 @@ bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; return true; } -void EPollEngine::WantWrite(EventHandler* eh) +void EPollEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - /** Use oneshot so that the system removes the writeable - * status for us and saves us a call. - */ - struct epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = EPOLLIN | EPOLLOUT; - ev.data.fd = eh->GetFd(); - epoll_ctl(EngineHandle, EPOLL_CTL_MOD, eh->GetFd(), &ev); + int old_events = mask_to_epoll(old_mask); + int new_events = mask_to_epoll(new_mask); + if (old_events != new_events) + { + // ok, we actually have something to tell the kernel about + struct epoll_event ev; + memset(&ev,0,sizeof(ev)); + ev.events = new_events; + ev.data.fd = eh->GetFd(); + epoll_ctl(EngineHandle, EPOLL_CTL_MOD, eh->GetFd(), &ev); + } } bool EPollEngine::DelFd(EventHandler* eh, bool force) @@ -117,37 +148,12 @@ bool EPollEngine::DelFd(EventHandler* eh, bool force) } ref[fd] = NULL; - CurrentSetSize--; ServerInstance->Logs->Log("SOCKET",DEBUG,"Remove file descriptor: %d", fd); + CurrentSetSize--; return true; } -int EPollEngine::GetMaxFds() -{ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); - printf("ERROR: Can't determine maximum number of open sockets!\n"); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } - return 0; -} - -int EPollEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; -} - int EPollEngine::DispatchEvents() { socklen_t codesize = sizeof(int); @@ -158,11 +164,13 @@ int EPollEngine::DispatchEvents() for (int j = 0; j < i; j++) { + EventHandler* eh = ref[events[j].data.fd]; + if (!eh) + continue; if (events[j].events & EPOLLHUP) { ErrorEvents++; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0); + eh->HandleEvent(EVENT_ERROR, 0); continue; } if (events[j].events & EPOLLERR) @@ -171,26 +179,20 @@ int EPollEngine::DispatchEvents() /* Get error number */ if (getsockopt(events[j].data.fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode); + eh->HandleEvent(EVENT_ERROR, errcode); continue; } - if (events[j].events & EPOLLOUT) + if (events[j].events & EPOLLIN) { - WriteEvents++; - struct epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = events[j].data.fd; - epoll_ctl(EngineHandle, EPOLL_CTL_MOD, events[j].data.fd, &ev); - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_WRITE); + ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } - else + if (events[j].events & EPOLLOUT) { - ReadEvents++; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_READ); + WriteEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + eh->HandleEvent(EVENT_WRITE); } } diff --git a/src/socketengines/socketengine_iocp.cpp b/src/socketengines/socketengine_iocp.cpp index 3c3181909..e09fb4d0a 100644 --- a/src/socketengines/socketengine_iocp.cpp +++ b/src/socketengines/socketengine_iocp.cpp @@ -33,7 +33,6 @@ IOCPEngine::IOCPEngine() /* Null variables out. */ CurrentSetSize = 0; - EngineHandle = 0; MAX_DESCRIPTORS = 10240; ref = new EventHandler* [10240]; memset(ref, 0, sizeof(EventHandler*) * MAX_DESCRIPTORS); @@ -47,7 +46,7 @@ IOCPEngine::~IOCPEngine() delete[] ref; } -bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) +bool IOCPEngine::AddFd(EventHandler* eh, int event_mask) { /* Does it at least look valid? */ if (!eh) @@ -92,7 +91,7 @@ bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) ServerInstance->Logs->Log("SOCKET",DEBUG, "New fake fd: %u, real fd: %u, address 0x%p", *fake_fd, eh->GetFd(), eh); /* post a write event if there is data to be written */ - if(writeFirst) + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) WantWrite(eh); /* we're all good =) */ @@ -107,6 +106,7 @@ bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) } ++CurrentSetSize; + SocketEngine::SetEventMask(eh, event_mask); ref[*fake_fd] = eh; return true; @@ -171,7 +171,7 @@ bool IOCPEngine::DelFd(EventHandler* eh, bool force /* = false */) return true; } -void IOCPEngine::WantWrite(EventHandler* eh) +void IOCPEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { if (!eh) return; @@ -183,7 +183,7 @@ void IOCPEngine::WantWrite(EventHandler* eh) return; /* Post event - write begin */ - if(!eh->GetExt("windows_writeevent", m_writeEvent)) + if((new_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) && !eh->GetExt("windows_writeevent", m_writeEvent)) { ULONG_PTR completion_key = (ULONG_PTR)*fake_fd; Overlapped * ov = new Overlapped(SOCKET_IO_EVENT_WRITE_READY, 0); @@ -315,6 +315,7 @@ int IOCPEngine::DispatchEvents() { WriteEvents++; eh->Shrink("windows_writeevent"); + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); eh->HandleEvent(EVENT_WRITE, 0); } break; @@ -322,6 +323,7 @@ int IOCPEngine::DispatchEvents() case SOCKET_IO_EVENT_READ_READY: { ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); if(ov->m_params) { // if we had params, it means we are a udp socket with a udp_overlap pointer in this long. diff --git a/src/socketengines/socketengine_kqueue.cpp b/src/socketengines/socketengine_kqueue.cpp index cbe3e959d..c9734e85d 100644 --- a/src/socketengines/socketengine_kqueue.cpp +++ b/src/socketengines/socketengine_kqueue.cpp @@ -54,16 +54,13 @@ KQueueEngine::~KQueueEngine() delete[] ke_list; } -bool KQueueEngine::AddFd(EventHandler* eh, bool writeFirst) +bool KQueueEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; @@ -79,12 +76,13 @@ bool KQueueEngine::AddFd(EventHandler* eh, bool writeFirst) return false; } - if (writeFirst) { + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) { // ...and sometimes want to write WantWrite(eh); } ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); @@ -126,38 +124,41 @@ bool KQueueEngine::DelFd(EventHandler* eh, bool force) return true; } -void KQueueEngine::WantWrite(EventHandler* eh) +void KQueueEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - struct kevent ke; - // EV_ONESHOT since we only ever want one write event - EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); - int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); - if (i < 0) { - ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", - eh->GetFd(), strerror(errno)); + if ((new_mask & FD_WANT_POLL_WRITE) && !(old_mask & FD_WANT_POLL_WRITE)) + { + // new poll-style write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } } -} - -int KQueueEngine::GetMaxFds() -{ - if (!MAX_DESCRIPTORS) + else if ((old_mask & FD_WANT_POLL_WRITE) && !(new_mask & FD_WANT_POLL_WRITE)) { - int mib[2], maxfiles; - size_t len; - - mib[0] = CTL_KERN; - mib[1] = KERN_MAXFILES; - len = sizeof(maxfiles); - sysctl(mib, 2, &maxfiles, &len, NULL, 0); - MAX_DESCRIPTORS = maxfiles; - return maxfiles; + // removing poll-style write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } + } + if ((new_mask & FD_WANT_EDGE_WRITE) && !(old_mask & FD_WANT_EDGE_WRITE)) + { + // new one-shot write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } } - return MAX_DESCRIPTORS; -} - -int KQueueEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; } int KQueueEngine::DispatchEvents() @@ -171,34 +172,31 @@ int KQueueEngine::DispatchEvents() for (int j = 0; j < i; j++) { + EventHandler* eh = ref[ke_list[j].ident]; + if (!eh) + continue; if (ke_list[j].flags & EV_EOF) { - /* We love you kqueue, oh yes we do *sings*! - * kqueue gives us the error number directly in the EOF state! - * Unlike smelly epoll and select, where we have to getsockopt - * to get the error, this saves us time and cpu cycles. Go BSD! - */ ErrorEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags); + eh->HandleEvent(EVENT_ERROR, ke_list[j].fflags); continue; } if (ke_list[j].filter == EVFILT_WRITE) { - /* We only ever add write events with EV_ONESHOT, which - * means they are automatically removed once such a - * event fires, so nothing to do here. - */ - WriteEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_WRITE); + /* When mask is FD_WANT_FAST_WRITE, we set a one-shot + * write, so we need to clear that bit to detect when it + * set again. + */ + const int bits_to_clr = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; + SetEventMask(eh, eh->GetEventMask() & ~bits_to_clr); + eh->HandleEvent(EVENT_WRITE); } if (ke_list[j].filter == EVFILT_READ) { ReadEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_READ); + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } } diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp index 6d5ddb9f5..6f50e2798 100644 --- a/src/socketengines/socketengine_poll.cpp +++ b/src/socketengines/socketengine_poll.cpp @@ -21,9 +21,28 @@ PollEngine::PollEngine() { - // Poll requires no special setup (which is nice). CurrentSetSize = 0; - MAX_DESCRIPTORS = 0; +#ifndef __FreeBSD__ + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets: %s", strerror(errno)); + printf("ERROR: Can't determine maximum number of open sockets: %s\n", strerror(errno)); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } +#else + int mib[2]; + size_t len; + + mib[0] = CTL_KERN; + mib[1] = KERN_MAXFILES; + len = sizeof(MAX_DESCRIPTORS); + sysctl(mib, 2, &MAX_DESCRIPTORS, &len, NULL, 0); +#endif ref = new EventHandler* [GetMaxFds()]; events = new struct pollfd[GetMaxFds()]; @@ -39,7 +58,17 @@ PollEngine::~PollEngine() delete[] events; } -bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_poll(int event_mask) +{ + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= POLLIN; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= POLLOUT; + return rv; +} + +bool PollEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) @@ -48,12 +77,6 @@ bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) return false; } - if (GetRemainingFds() <= 1) - { - ServerInstance->Logs->Log("SOCKET",DEBUG,"No remaining FDs cannot add fd: %d", fd); - return false; - } - if (fd_mappings.find(fd) != fd_mappings.end()) { ServerInstance->Logs->Log("SOCKET",DEBUG,"Attempt to add duplicate fd: %d", fd); @@ -65,16 +88,10 @@ bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) fd_mappings[fd] = index; ref[index] = eh; events[index].fd = fd; - if (writeFirst) - { - events[index].events = POLLOUT; - } - else - { - events[index].events = POLLIN; - } + events[index].events = mask_to_poll(event_mask); ServerInstance->Logs->Log("SOCKET", DEBUG,"New file descriptor: %d (%d; index %d)", fd, events[fd].events, index); + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; return true; } @@ -87,16 +104,16 @@ EventHandler* PollEngine::GetRef(int fd) return ref[it->second]; } -void PollEngine::WantWrite(EventHandler* eh) +void PollEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { std::map<int, unsigned int>::iterator it = fd_mappings.find(eh->GetFd()); if (it == fd_mappings.end()) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"WantWrite() on unknown fd: %d", eh->GetFd()); + ServerInstance->Logs->Log("SOCKET",DEBUG,"SetEvents() on unknown fd: %d", eh->GetFd()); return; } - events[it->second].events = POLLIN | POLLOUT; + events[it->second].events = mask_to_poll(new_mask); } bool PollEngine::DelFd(EventHandler* eh, bool force) @@ -147,48 +164,6 @@ bool PollEngine::DelFd(EventHandler* eh, bool force) return true; } -int PollEngine::GetMaxFds() -{ -#ifndef __FreeBSD__ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - MAX_DESCRIPTORS = 0; - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets: %s", strerror(errno)); - printf("ERROR: Can't determine maximum number of open sockets: %s\n", strerror(errno)); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } - return 0; -#else - if (!MAX_DESCRIPTORS) - { - int mib[2], maxfiles; - size_t len; - - mib[0] = CTL_KERN; - mib[1] = KERN_MAXFILES; - len = sizeof(maxfiles); - sysctl(mib, 2, &maxfiles, &len, NULL, 0); - MAX_DESCRIPTORS = maxfiles; - return maxfiles; - } - return MAX_DESCRIPTORS; -#endif -} - -int PollEngine::GetRemainingFds() -{ - return MAX_DESCRIPTORS - CurrentSetSize; -} - int PollEngine::DispatchEvents() { int i = poll(events, CurrentSetSize, 1000); @@ -203,11 +178,13 @@ int PollEngine::DispatchEvents() { if (events[index].revents) processed++; + EventHandler* eh = ref[index]; + if (!eh) + continue; if (events[index].revents & POLLHUP) { - if (ref[index]) - ref[index]->HandleEvent(EVENT_ERROR, 0); + eh->HandleEvent(EVENT_ERROR, 0); continue; } @@ -219,25 +196,20 @@ int PollEngine::DispatchEvents() // Get error number if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; - if (ref[index]) - ref[index]->HandleEvent(EVENT_ERROR, errcode); + eh->HandleEvent(EVENT_ERROR, errcode); continue; } - if (events[index].revents & POLLOUT) + if (events[index].revents & POLLIN) { - // Switch to wanting read again - // event handlers have to request to write again if they need it - events[index].events = POLLIN; - - if (ref[index]) - ref[index]->HandleEvent(EVENT_WRITE); + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } - - if (events[index].revents & POLLIN) + + if (events[index].revents & POLLOUT) { - if (ref[index]) - ref[index]->HandleEvent(EVENT_READ); + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + eh->HandleEvent(EVENT_WRITE); } } } diff --git a/src/socketengines/socketengine_ports.cpp b/src/socketengines/socketengine_ports.cpp index eb08839d0..a99806fc4 100644 --- a/src/socketengines/socketengine_ports.cpp +++ b/src/socketengines/socketengine_ports.cpp @@ -19,7 +19,18 @@ PortsEngine::PortsEngine() { - MAX_DESCRIPTORS = 0; + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + return max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); + printf("ERROR: Can't determine maximum number of open sockets!\n"); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } EngineHandle = port_create(); if (EngineHandle == -1) @@ -44,29 +55,38 @@ PortsEngine::~PortsEngine() delete[] events; } -bool PortsEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_events(int event_mask) +{ + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= POLLRDNORM; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= POLLWRNORM; + return rv; +} + +bool PortsEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; ref[fd] = eh; - port_associate(EngineHandle, PORT_SOURCE_FD, fd, writeFirst ? POLLWRNORM : POLLRDNORM, eh); + SocketEngine::SetEventMask(eh, event_mask); + port_associate(EngineHandle, PORT_SOURCE_FD, fd, mask_to_events(event_mask), eh); ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); CurrentSetSize++; return true; } -void PortsEngine::WantWrite(EventHandler* eh) +void PortsEngine::WantWrite(EventHandler* eh, int old_mask, int new_mask) { - port_associate(EngineHandle, PORT_SOURCE_FD, eh->GetFd(), POLLRDNORM | POLLWRNORM, eh); + if (mask_to_events(new_mask) != mask_to_events(old_mask)) + port_associate(EngineHandle, PORT_SOURCE_FD, eh->GetFd(), mask_to_events(new_mask), eh); } bool PortsEngine::DelFd(EventHandler* eh, bool force) @@ -84,31 +104,6 @@ bool PortsEngine::DelFd(EventHandler* eh, bool force) return true; } -int PortsEngine::GetMaxFds() -{ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); - printf("ERROR: Can't determine maximum number of open sockets!\n"); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } -#include <ulimit.h> -} - -int PortsEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; -} - int PortsEngine::DispatchEvents() { struct timespec poll_time; @@ -132,15 +127,27 @@ int PortsEngine::DispatchEvents() case PORT_SOURCE_FD: { int fd = this->events[i].portev_object; - if (ref[fd]) + EventHandler* eh = ref[fd]; + if (eh) { - // reinsert port for next time around - port_associate(EngineHandle, PORT_SOURCE_FD, fd, POLLRDNORM, ref[fd]); - if ((this->events[i].portev_events & POLLRDNORM)) + int mask = eh->GetEventMask(); + if (events[i].portev_events & POLLWRNORM) + mask &= ~(FD_WRITE_WILL_BLOCK | FD_WANT_FAST_WRITE); + if (events[i].portev_events & POLLRDNORM) + mask &= ~FD_READ_WILL_BLOCK; + // reinsert port for next time around, pretending to be one-shot for writes + SetEventMask(ev, mask); + port_associate(EngineHandle, PORT_SOURCE_FD, fd, mask_to_events(mask), eh); + if (events[i].portev_events & POLLRDNORM) + { ReadEvents++; - else + eh->HandleEvent(EVENT_READ); + } + if (events[i].portev_events & POLLWRNORM) + { WriteEvents++; - ref[fd]->HandleEvent((this->events[i].portev_events & POLLRDNORM) ? EVENT_READ : EVENT_WRITE); + eh->HandleEvent(EVENT_WRITE); + } } } default: diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp index 7f6a4e283..f089fd698 100644 --- a/src/socketengines/socketengine_select.cpp +++ b/src/socketengines/socketengine_select.cpp @@ -21,10 +21,8 @@ SelectEngine::SelectEngine() { MAX_DESCRIPTORS = FD_SETSIZE; - EngineHandle = 0; CurrentSetSize = 0; - writeable.assign(GetMaxFds(), false); ref = new EventHandler* [GetMaxFds()]; memset(ref, 0, GetMaxFds() * sizeof(EventHandler*)); } @@ -34,33 +32,23 @@ SelectEngine::~SelectEngine() delete[] ref; } -bool SelectEngine::AddFd(EventHandler* eh, bool writeFirst) +bool SelectEngine::AddFd(EventHandler* eh, int) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; - fds.insert(fd); ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; - writeable[eh->GetFd()] = writeFirst; - ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); return true; } -void SelectEngine::WantWrite(EventHandler* eh) -{ - writeable[eh->GetFd()] = true; -} - bool SelectEngine::DelFd(EventHandler* eh, bool force) { int fd = eh->GetFd(); @@ -68,10 +56,6 @@ bool SelectEngine::DelFd(EventHandler* eh, bool force) if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - std::set<int>::iterator t = fds.find(fd); - if (t != fds.end()) - fds.erase(t); - CurrentSetSize--; ref[fd] = NULL; @@ -79,14 +63,9 @@ bool SelectEngine::DelFd(EventHandler* eh, bool force) return true; } -int SelectEngine::GetMaxFds() +void SelectEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - return FD_SETSIZE; -} - -int SelectEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; + // deal with it later } int SelectEngine::DispatchEvents() @@ -96,24 +75,26 @@ int SelectEngine::DispatchEvents() socklen_t codesize = sizeof(int); int errcode = 0; + fd_set wfdset, rfdset, errfdset; FD_ZERO(&wfdset); FD_ZERO(&rfdset); FD_ZERO(&errfdset); - /* Populate the select FD set (this is why select sucks compared to epoll, kqueue, IOCP) */ - for (std::set<int>::iterator a = fds.begin(); a != fds.end(); a++) + /* Populate the select FD sets (this is why select sucks compared to epoll, kqueue, IOCP) */ + for (int i = 0; i < FD_SETSIZE; i++) { - /* Explicitly one-time writeable */ - if (writeable[*a]) - FD_SET (*a, &wfdset); - else - FD_SET (*a, &rfdset); - - /* All sockets must receive error notifications regardless */ - FD_SET (*a, &errfdset); + EventHandler* eh = ref[i]; + if (!eh) + continue; + int state = eh->GetEventMask(); + if (state & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + FD_SET (i, &rfdset); + if (state & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + FD_SET (i, &wfdset); + FD_SET (i, &errfdset); } - /* One second waits */ + /* One second wait */ tval.tv_sec = 1; tval.tv_usec = 0; @@ -123,16 +104,15 @@ int SelectEngine::DispatchEvents() if (sresult < 1) return 0; - std::vector<int> copy(fds.begin(), fds.end()); - for (std::vector<int>::iterator a = copy.begin(); a != copy.end(); a++) + for (int i = 0; i < FD_SETSIZE; i++) { - EventHandler* ev = ref[*a]; + EventHandler* ev = ref[i]; if (ev) { - if (FD_ISSET (ev->GetFd(), &errfdset)) + if (FD_ISSET (i, &errfdset)) { ErrorEvents++; - if (getsockopt(ev->GetFd(), SOL_SOCKET, SO_ERROR, (char*)&errcode, &codesize) < 0) + if (getsockopt(i, SOL_SOCKET, SO_ERROR, (char*)&errcode, &codesize) < 0) errcode = errno; ev->HandleEvent(EVENT_ERROR, errcode); @@ -145,16 +125,17 @@ int SelectEngine::DispatchEvents() * If an error event occurs above it is not worth processing the * read and write states even if set. */ - if (FD_ISSET (ev->GetFd(), &wfdset)) + if (FD_ISSET (i, &rfdset)) { - WriteEvents++; - writeable[ev->GetFd()] = false; - ev->HandleEvent(EVENT_WRITE); + ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + ev->HandleEvent(EVENT_READ); } - if (FD_ISSET (ev->GetFd(), &rfdset)) + if (FD_ISSET (i, &wfdset)) { - ReadEvents++; - ev->HandleEvent(EVENT_READ); + WriteEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + ev->HandleEvent(EVENT_WRITE); } } } diff --git a/src/usermanager.cpp b/src/usermanager.cpp index 15196ac69..fe6d280c0 100644 --- a/src/usermanager.cpp +++ b/src/usermanager.cpp @@ -141,7 +141,7 @@ void UserManager::AddUser(InspIRCd* Instance, int socket, ClientListenSocket* vi } } - if (!Instance->SE->AddFd(New)) + if (!Instance->SE->AddFd(New, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE)) { Instance->Logs->Log("USERS", DEBUG,"Internal error on new connection"); this->QuitUser(New, "Internal error handling connection"); |