diff options
author | Attila Molnar <attilamolnar@hush.com> | 2015-06-06 14:31:05 +0200 |
---|---|---|
committer | Attila Molnar <attilamolnar@hush.com> | 2015-06-06 14:31:05 +0200 |
commit | 1f0485039a276ad1c2fa3d53d284e3a87940ec77 (patch) | |
tree | 5689bf7bbcd023956ab67d4730428b1c746decce | |
parent | 0858cdd53cd1ec01c4539e9c36ef7dd9fab4aa16 (diff) |
Convert all code to use StreamSocket::SendQueue
Let OnStreamSocketWrite see the entire sendq instead of one element at a time
-rw-r--r-- | include/inspsocket.h | 13 | ||||
-rw-r--r-- | include/iohook.h | 3 | ||||
-rw-r--r-- | src/inspsocket.cpp | 31 | ||||
-rw-r--r-- | src/modules/extra/m_ssl_gnutls.cpp | 10 | ||||
-rw-r--r-- | src/modules/extra/m_ssl_openssl.cpp | 6 |
5 files changed, 30 insertions, 33 deletions
diff --git a/include/inspsocket.h b/include/inspsocket.h index 43bd3e3ab..53eca2e91 100644 --- a/include/inspsocket.h +++ b/include/inspsocket.h @@ -212,11 +212,10 @@ class CoreExport StreamSocket : public EventHandler /** The IOHook that handles raw I/O for this socket, or NULL */ IOHook* iohook; - /** Private send queue. Note that individual strings may be shared + /** Send queue of the socket */ - std::deque<std::string> sendq; - /** Length, in bytes, of the sendq */ - size_t sendq_len; + SendQueue sendq; + /** Error - if nonempty, the socket is dead, and this is the reason. */ std::string error; @@ -232,7 +231,7 @@ class CoreExport StreamSocket : public EventHandler protected: std::string recvq; public: - StreamSocket() : iohook(NULL), sendq_len(0) {} + StreamSocket() : iohook(NULL) { } IOHook* GetIOHook() const; void AddIOHook(IOHook* hook); void DelIOHook(); @@ -275,7 +274,9 @@ class CoreExport StreamSocket : public EventHandler */ bool GetNextLine(std::string& line, char delim = '\n'); /** Useful for implementing sendq exceeded */ - inline size_t getSendQSize() const { return sendq_len; } + size_t getSendQSize() const { return sendq.size(); } + + SendQueue& GetSendQ() { return sendq; } /** * Close the socket, remove from socket engine, etc diff --git a/include/iohook.h b/include/iohook.h index ce7ca2a1b..cf27fcb0c 100644 --- a/include/iohook.h +++ b/include/iohook.h @@ -66,11 +66,10 @@ class IOHook : public classbase * Called when a hooked stream has data to write, or when the socket * engine returns it as writable * @param sock The socket in question - * @param sendq Data to send to the socket * @return 1 if the sendq has been completely emptied, 0 if there is * still data to send, and -1 if there was an error */ - virtual int OnStreamSocketWrite(StreamSocket* sock, std::string& sendq) = 0; + virtual int OnStreamSocketWrite(StreamSocket* sock) = 0; /** Called immediately before any socket is closed. When this event is called, shutdown() * has not yet been called on the socket. diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp index 7ddd77495..b8f8949dd 100644 --- a/src/inspsocket.cpp +++ b/src/inspsocket.cpp @@ -204,7 +204,7 @@ void StreamSocket::DoWrite() { while (error.empty() && !sendq.empty()) { - if (sendq.size() > 1 && sendq[0].length() < 1024) + if (sendq.size() > 1 && sendq.front().length() < 1024) { // Avoid multiple repeated SSL encryption invocations // This adds a single copy of the queue, but avoids @@ -222,24 +222,18 @@ void StreamSocket::DoWrite() } sendq.push_front(tmp); } - std::string& front = sendq.front(); - int itemlen = front.length(); { - int rv = GetIOHook()->OnStreamSocketWrite(this, front); + int rv = GetIOHook()->OnStreamSocketWrite(this); if (rv > 0) { // consumed the entire string, and is ready for more - sendq_len -= itemlen; sendq.pop_front(); } else if (rv == 0) { // socket has blocked. Stop trying to send data. // IOHook has requested unblock notification from the socketengine - - // Since it is possible that a partial write took place, adjust sendq_len - sendq_len = sendq_len - itemlen + front.length(); return; } else @@ -258,7 +252,7 @@ void StreamSocket::DoWrite() return; // start out optimistic - we won't need to write any more int eventChange = FD_WANT_EDGE_WRITE; - while (error.empty() && sendq_len && eventChange == FD_WANT_EDGE_WRITE) + while (error.empty() && !sendq.empty() && eventChange == FD_WANT_EDGE_WRITE) { // Prepare a writev() call to write all buffers efficiently int bufcount = sendq.size(); @@ -273,20 +267,21 @@ void StreamSocket::DoWrite() int rv; { SocketEngine::IOVector iovecs[MYIOV_MAX]; - for (int i = 0; i < bufcount; i++) + size_t j = 0; + for (SendQueue::const_iterator i = sendq.begin(), end = i+bufcount; i != end; ++i, j++) { - iovecs[i].iov_base = const_cast<char*>(sendq[i].data()); - iovecs[i].iov_len = sendq[i].length(); - rv_max += sendq[i].length(); + const SendQueue::Element& elem = *i; + iovecs[j].iov_base = const_cast<char*>(elem.data()); + iovecs[j].iov_len = elem.length(); + rv_max += elem.length(); } rv = SocketEngine::WriteV(this, iovecs, bufcount); } - if (rv == (int)sendq_len) + if (rv == (int)sendq.bytes()) { // it's our lucky day, everything got written out. Fast cleanup. // This won't ever happen if the number of buffers got capped. - sendq_len = 0; sendq.clear(); } else if (rv > 0) @@ -297,10 +292,9 @@ void StreamSocket::DoWrite() // it's going to block now eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; } - sendq_len -= rv; while (rv > 0 && !sendq.empty()) { - std::string& front = sendq.front(); + const SendQueue::Element& front = sendq.front(); if (front.length() <= (size_t)rv) { // this string got fully written out @@ -310,7 +304,7 @@ void StreamSocket::DoWrite() else { // stopped in the middle of this string - front.erase(0, rv); + sendq.erase_front(rv); rv = 0; } } @@ -356,7 +350,6 @@ void StreamSocket::WriteData(const std::string &data) /* Append the data to the back of the queue ready for writing */ sendq.push_back(data); - sendq_len += data.length(); SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE); } diff --git a/src/modules/extra/m_ssl_gnutls.cpp b/src/modules/extra/m_ssl_gnutls.cpp index d33403aba..e142ead11 100644 --- a/src/modules/extra/m_ssl_gnutls.cpp +++ b/src/modules/extra/m_ssl_gnutls.cpp @@ -968,7 +968,7 @@ info_done_dealloc: } } - int OnStreamSocketWrite(StreamSocket* user, std::string& sendq) CXX11_OVERRIDE + int OnStreamSocketWrite(StreamSocket* user) CXX11_OVERRIDE { // Finish handshake if needed int prepret = PrepareIO(user); @@ -976,19 +976,21 @@ info_done_dealloc: return prepret; // Session is ready for transferring application data + StreamSocket::SendQueue& sendq = user->GetSendQ(); int ret = 0; { - ret = gnutls_record_send(this->sess, sendq.data(), sendq.length()); + const StreamSocket::SendQueue::Element& buffer = sendq.front(); + ret = gnutls_record_send(this->sess, buffer.data(), buffer.length()); - if (ret == (int)sendq.length()) + if (ret == (int)buffer.length()) { SocketEngine::ChangeEventMask(user, FD_WANT_NO_WRITE); return 1; } else if (ret > 0) { - sendq.erase(0, ret); + sendq.erase_front(ret); SocketEngine::ChangeEventMask(user, FD_WANT_SINGLE_WRITE); return 0; } diff --git a/src/modules/extra/m_ssl_openssl.cpp b/src/modules/extra/m_ssl_openssl.cpp index c8a035fac..c2a71eeca 100644 --- a/src/modules/extra/m_ssl_openssl.cpp +++ b/src/modules/extra/m_ssl_openssl.cpp @@ -601,7 +601,7 @@ class OpenSSLIOHook : public SSLIOHook } } - int OnStreamSocketWrite(StreamSocket* user, std::string& buffer) CXX11_OVERRIDE + int OnStreamSocketWrite(StreamSocket* user) CXX11_OVERRIDE { // Finish handshake if needed int prepret = PrepareIO(user); @@ -611,8 +611,10 @@ class OpenSSLIOHook : public SSLIOHook data_to_write = true; // Session is ready for transferring application data + StreamSocket::SendQueue& sendq = user->GetSendQ(); { ERR_clear_error(); + const StreamSocket::SendQueue::Element& buffer = sendq.front(); int ret = SSL_write(sess, buffer.data(), buffer.size()); #ifdef INSPIRCD_OPENSSL_ENABLE_RENEGO_DETECTION @@ -628,7 +630,7 @@ class OpenSSLIOHook : public SSLIOHook } else if (ret > 0) { - buffer.erase(0, ret); + sendq.erase_front(ret); SocketEngine::ChangeEventMask(user, FD_WANT_SINGLE_WRITE); return 0; } |