From 45b07a069108d661f7d3b63b040e4db5166a2dd8 Mon Sep 17 00:00:00 2001 From: brain Date: Mon, 23 May 2005 17:52:46 +0000 Subject: [PATCH] Output buffering on server connections git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@1475 e03df62e-2008-0410-955e-edbf42e46eb7 --- include/connection.h | 35 ++++++++++++++++++++++- include/servers.h | 4 +++ src/connection.cpp | 68 ++++++++++++++++++++++++++++++++++++++++---- src/inspircd.cpp | 2 ++ src/servers.cpp | 37 +++++++++++++++++++++--- 5 files changed, 136 insertions(+), 10 deletions(-) diff --git a/include/connection.h b/include/connection.h index ef18ec7c0..420839b0d 100644 --- a/include/connection.h +++ b/include/connection.h @@ -85,6 +85,14 @@ class ircd_connector : public Extensible */ std::string version; + /** SendQ of the outbound connector, does not have a limit + */ + std::string sendq; + + /** Write error of connection + */ + std::string WriteError; + public: /** IRCD Buffer for input characters, holds as many lines as are @@ -111,7 +119,10 @@ class ircd_connector : public Extensible * whilever both servers are connected to B. */ std::vector routes; - + + /** Constructor clears the sendq and initialises the fd to -1 + */ + ircd_connector(); /** Create an outbound connection to a listening socket */ @@ -204,6 +215,28 @@ class ircd_connector : public Extensible * If the server has no version string an empty string is returned. */ std::string GetVersionString(); + + /** Adds data to the connection's sendQ to be flushed later + * Fails if there is an error pending on the connection. + */ + bool AddWriteBuf(std::string data); + + /** Flushes as much of the data from the buffer as possible, + * and advances the queue pointer to what is left. + */ + bool FlushWriteBuf(); + + /** Sets the error string for this connection + */ + void SetWriteError(std::string error); + + /** Gets the error string for this connection + */ + std::string GetWriteError(); + + /** Returns true if there is data to be written that hasn't been sent yet + */ + bool HasBufferedOutput(); }; diff --git a/include/servers.h b/include/servers.h index e7d7f2547..e9940704c 100644 --- a/include/servers.h +++ b/include/servers.h @@ -116,6 +116,10 @@ class serverrec : public connection * (reserved for core use) */ bool AddIncoming(int fd,char* targethost, int sourceport); + + /** Flushes all data waiting to be written for all of this server's connections + */ + void FlushWriteBuffers(); }; #endif diff --git a/src/connection.cpp b/src/connection.cpp index 222251bb4..7d5df66f9 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -72,10 +72,18 @@ std::string CreateSum() connection::connection() { - fd = 0; + fd = -1; } +ircd_connector::ircd_connector() +{ + fd = -1; + port = 0; + sendq = ""; + WriteError = ""; +} + char* ircd_connector::GetServerIP() { return this->host; @@ -154,6 +162,59 @@ std::string ircd_connector::GetBuffer() return ret; } +bool ircd_connector::AddWriteBuf(std::string data) +{ + log(DEBUG,"connector::AddWriteBuf(%s)",data.c_str()); + if (this->GetWriteError() != "") + return false; + std::stringstream stream; + stream << sendq << data; + sendq = stream.str(); + return true; +} + +bool ircd_connector::HasBufferedOutput() +{ + return (sendq.length() > 0); +} + +// send AS MUCH OF THE USERS SENDQ as we are able to (might not be all of it) +bool ircd_connector::FlushWriteBuf() +{ + log(DEBUG,"connector::FlushWriteBuf()"); + if (sendq.length()) + { + char* tb = (char*)this->sendq.c_str(); + int n_sent = write(this->fd,tb,this->sendq.length()); + if (n_sent == -1) + { + this->SetWriteError(strerror(errno)); + return false; + } + else + { + log(DEBUG,"Wrote %d chars to socket",n_sent); + // advance the queue + tb += n_sent; + this->sendq = tb; + return true; + } + } + return true; +} + +void ircd_connector::SetWriteError(std::string error) +{ + if (this->WriteError == "") + this->WriteError = error; +} + +std::string ircd_connector::GetWriteError() +{ + return this->WriteError; +} + + bool ircd_connector::MakeOutboundConnection(char* newhost, int newport) { log(DEBUG,"MakeOutboundConnection: Original param: %s",newhost); @@ -255,11 +316,8 @@ void ircd_connector::SetState(int newstate) void ircd_connector::CloseConnection() { - int flags = fcntl(this->fd, F_GETFL, 0); - fcntl(this->fd, F_SETFL, flags ^ O_NONBLOCK); + shutdown(this->fd,2); close(this->fd); - flags = fcntl(this->fd, F_GETFL, 0); - fcntl(this->fd, F_SETFL, flags | O_NONBLOCK); } void ircd_connector::SetDescriptor(int newfd) diff --git a/src/inspircd.cpp b/src/inspircd.cpp index ef9835fd4..132832bc1 100644 --- a/src/inspircd.cpp +++ b/src/inspircd.cpp @@ -2881,6 +2881,8 @@ int InspIRCd(char** argv, int argc) std::deque sums; for (int x = 0; x < SERVERportCount; x++) { + if (me[x]) + me[x]->FlushWriteBuffers(); sums.clear(); msgs.clear(); while ((me[x]) && (me[x]->RecvPacket(msgs, tcp_host, sums))) // returns 0 or more lines (can be multiple lines!) diff --git a/src/servers.cpp b/src/servers.cpp index fe29948bc..d2ace18e8 100644 --- a/src/servers.cpp +++ b/src/servers.cpp @@ -246,6 +246,23 @@ ircd_connector* serverrec::FindHost(std::string findhost) return NULL; } +void serverrec::FlushWriteBuffers() +{ + for (int i = 0; i < this->connectors.size(); i++) + { + if (this->connectors[i].HasBufferedOutput()) + { + if (!this->connectors[i].FlushWriteBuf()) + { + // if we're here the write() caused an error, we cannot proceed + WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",this->connectors[i].GetServerName().c_str(),this->connectors[i].GetWriteError().c_str()); + this->connectors[i].CloseConnection(); + this->connectors[i].SetState(STATE_DISCONNECTED); + } + } + } +} + bool serverrec::SendPacket(char *message, const char* sendhost) { if ((!message) || (!sendhost)) @@ -264,7 +281,6 @@ bool serverrec::SendPacket(char *message, const char* sendhost) if (cn->GetState() == STATE_DISCONNECTED) { - log(DEBUG,"\n\n\n\nMain route to %s is down, seeking alternative\n\n\n\n",sendhost); // fix: can only route one hop to avoid a loop if (strncmp(message,"R ",2)) { @@ -289,22 +305,35 @@ bool serverrec::SendPacket(char *message, const char* sendhost) } char buffer[MAXBUF]; snprintf(buffer,MAXBUF,"& %s",sendhost); + WriteOpers("*** All connections to %s lost.",sendhost); NetSendToAllExcept(sendhost,buffer); - log(DEBUG,"\n\nThere are no routes to %s, we're gonna boot the server off!\n\n",sendhost); DoSplit(sendhost); return false; } // returns false if the packet could not be sent (e.g. target host down) - if (send(cn->GetDescriptor(),message,strlen(message),0)<0) + if (!cn->AddWriteBuf(message)) { - log(DEBUG,"send() failed for serverrec::SendPacket(): %s",strerror(errno)); + // if we're here, there was an error pending, and the send cannot proceed + log(DEBUG,"cn->AddWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str()); log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str()); cn->CloseConnection(); cn->SetState(STATE_DISCONNECTED); + WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str()); // retry the packet along a new route so either arrival OR failure are gauranteed (bugfix) return this->SendPacket(message,sendhost); } + if (!cn->FlushWriteBuf()) + { + // if we're here the write() caused an error, we cannot proceed + log(DEBUG,"cn->FlushWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str()); + log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str()); + cn->CloseConnection(); + cn->SetState(STATE_DISCONNECTED); + WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str()); + // retry the packet along a new route so either arrival OR failure are gauranteed + return this->SendPacket(message,sendhost); + } return true; } } -- 2.39.5