]> git.netwichtig.de Git - user/henk/code/inspircd.git/commitdiff
Output buffering on server connections
authorbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>
Mon, 23 May 2005 17:52:46 +0000 (17:52 +0000)
committerbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>
Mon, 23 May 2005 17:52:46 +0000 (17:52 +0000)
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@1475 e03df62e-2008-0410-955e-edbf42e46eb7

include/connection.h
include/servers.h
src/connection.cpp
src/inspircd.cpp
src/servers.cpp

index ef18ec7c0026ec47cbae18a8455da146faaa07d6..420839b0dc21d2be5fd4318a66069c4d78e2492e 100644 (file)
@@ -85,6 +85,14 @@ class ircd_connector : public Extensible
         */
        std::string version;
 
+       /** SendQ of the outbound connector, does not have a limit
+        */
+       std::string sendq;
+
+       /** Write error of connection
+        */
+       std::string WriteError;
+
  public:
 
         /** IRCD Buffer for input characters, holds as many lines as are
@@ -111,7 +119,10 @@ class ircd_connector : public Extensible
         * whilever both servers are connected to B.
         */
        std::vector<std::string> routes;
-       
+
+       /** Constructor clears the sendq and initialises the fd to -1
+        */
+       ircd_connector();       
 
        /** Create an outbound connection to a listening socket
         */ 
@@ -204,6 +215,28 @@ class ircd_connector : public Extensible
         * If the server has no version string an empty string is returned.
         */
         std::string GetVersionString();
+
+       /** Adds data to the connection's sendQ to be flushed later
+        * Fails if there is an error pending on the connection.
+        */
+       bool AddWriteBuf(std::string data);
+
+       /** Flushes as much of the data from the buffer as possible,
+        * and advances the queue pointer to what is left.
+        */
+       bool FlushWriteBuf();
+
+       /** Sets the error string for this connection
+        */
+       void SetWriteError(std::string error);
+
+       /** Gets the error string for this connection
+        */
+       std::string GetWriteError();
+
+       /** Returns true if there is data to be written that hasn't been sent yet
+        */
+       bool HasBufferedOutput();
 };
 
 
index e7d7f254760769a48dc953e271880ebda0504fdd..e9940704c161a00706573e6ea70d4e509f360edd 100644 (file)
@@ -116,6 +116,10 @@ class serverrec : public connection
          * (reserved for core use)
          */
         bool AddIncoming(int fd,char* targethost, int sourceport);     
+
+       /** Flushes all data waiting to be written for all of this server's connections
+        */
+       void FlushWriteBuffers();
 };
 
 #endif
index 222251bb446db23b244743679e01224fad1cfb72..7d5df66f95c4784be929c2d9faeef5732bad1891 100644 (file)
@@ -72,10 +72,18 @@ std::string CreateSum()
 
 connection::connection()
 {
-       fd = 0;
+       fd = -1;
 }
 
 
+ircd_connector::ircd_connector()
+{
+       fd = -1;
+       port = 0;
+        sendq = "";
+        WriteError = "";
+}
+
 char* ircd_connector::GetServerIP()
 {
        return this->host;
@@ -154,6 +162,59 @@ std::string ircd_connector::GetBuffer()
         return ret;
 }
 
+bool ircd_connector::AddWriteBuf(std::string data)
+{
+       log(DEBUG,"connector::AddWriteBuf(%s)",data.c_str());
+        if (this->GetWriteError() != "")
+                return false;
+        std::stringstream stream;
+        stream << sendq << data;
+        sendq = stream.str();
+       return true;
+}
+
+bool ircd_connector::HasBufferedOutput()
+{
+       return (sendq.length() > 0);
+}
+
+// send AS MUCH OF THE USERS SENDQ as we are able to (might not be all of it)
+bool ircd_connector::FlushWriteBuf()
+{
+       log(DEBUG,"connector::FlushWriteBuf()");
+        if (sendq.length())
+        {
+                char* tb = (char*)this->sendq.c_str();
+                int n_sent = write(this->fd,tb,this->sendq.length());
+                if (n_sent == -1)
+                {
+                        this->SetWriteError(strerror(errno));
+                       return false;
+                }
+                else
+                {
+                       log(DEBUG,"Wrote %d chars to socket",n_sent);
+                        // advance the queue
+                        tb += n_sent;
+                        this->sendq = tb;
+                       return true;
+                }
+        }
+       return true;
+}
+
+void ircd_connector::SetWriteError(std::string error)
+{
+        if (this->WriteError == "")
+                this->WriteError = error;
+}
+
+std::string ircd_connector::GetWriteError()
+{
+        return this->WriteError;
+}
+
+
 bool ircd_connector::MakeOutboundConnection(char* newhost, int newport)
 {
        log(DEBUG,"MakeOutboundConnection: Original param: %s",newhost);
@@ -255,11 +316,8 @@ void ircd_connector::SetState(int newstate)
 
 void ircd_connector::CloseConnection()
 {
-       int flags = fcntl(this->fd, F_GETFL, 0);
-       fcntl(this->fd, F_SETFL, flags ^ O_NONBLOCK);
+       shutdown(this->fd,2);
        close(this->fd);
-       flags = fcntl(this->fd, F_GETFL, 0);
-       fcntl(this->fd, F_SETFL, flags | O_NONBLOCK);
 }
 
 void ircd_connector::SetDescriptor(int newfd)
index ef9835fd44db4959727311e89de7f3761286f484..132832bc13e3cc439fa1d22a61761b7742fdec3b 100644 (file)
@@ -2881,6 +2881,8 @@ int InspIRCd(char** argv, int argc)
                std::deque<std::string> sums;
                for (int x = 0; x < SERVERportCount; x++)
                {
+                       if (me[x])
+                               me[x]->FlushWriteBuffers();
                        sums.clear();
                        msgs.clear();
                        while ((me[x]) && (me[x]->RecvPacket(msgs, tcp_host, sums))) // returns 0 or more lines (can be multiple lines!)
index fe29948bcc993deaef81a18e91aa4764e7de887b..d2ace18e875ef55f02f99a31f1fb544b72189471 100644 (file)
@@ -246,6 +246,23 @@ ircd_connector* serverrec::FindHost(std::string findhost)
         return NULL;
 }
 
+void serverrec::FlushWriteBuffers()
+{
+       for (int i = 0; i < this->connectors.size(); i++)
+       {
+                if (this->connectors[i].HasBufferedOutput())
+                {
+                       if (!this->connectors[i].FlushWriteBuf())
+                       {
+                               // if we're here the write() caused an error, we cannot proceed
+                               WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",this->connectors[i].GetServerName().c_str(),this->connectors[i].GetWriteError().c_str());
+                               this->connectors[i].CloseConnection();
+                               this->connectors[i].SetState(STATE_DISCONNECTED);
+                       }
+                }
+       }
+}
+
 bool serverrec::SendPacket(char *message, const char* sendhost)
 {
         if ((!message) || (!sendhost))
@@ -264,7 +281,6 @@ bool serverrec::SendPacket(char *message, const char* sendhost)
 
                 if (cn->GetState() == STATE_DISCONNECTED)
                 {
-                        log(DEBUG,"\n\n\n\nMain route to %s is down, seeking alternative\n\n\n\n",sendhost);
                         // fix: can only route one hop to avoid a loop
                         if (strncmp(message,"R ",2))
                         {
@@ -289,22 +305,35 @@ bool serverrec::SendPacket(char *message, const char* sendhost)
                         }
                         char buffer[MAXBUF];
                         snprintf(buffer,MAXBUF,"& %s",sendhost);
+                       WriteOpers("*** All connections to %s lost.",sendhost);
                         NetSendToAllExcept(sendhost,buffer);
-                        log(DEBUG,"\n\nThere are no routes to %s, we're gonna boot the server off!\n\n",sendhost);
                         DoSplit(sendhost);
                         return false;
                 }
 
                 // returns false if the packet could not be sent (e.g. target host down)
-                if (send(cn->GetDescriptor(),message,strlen(message),0)<0)
+                if (!cn->AddWriteBuf(message))
                 {
-                        log(DEBUG,"send() failed for serverrec::SendPacket(): %s",strerror(errno));
+                       // if we're here, there was an error pending, and the send cannot proceed
+                        log(DEBUG,"cn->AddWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str());
                         log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str());
                         cn->CloseConnection();
                         cn->SetState(STATE_DISCONNECTED);
+                       WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str());
                         // retry the packet along a new route so either arrival OR failure are gauranteed (bugfix)
                         return this->SendPacket(message,sendhost);
                 }
+               if (!cn->FlushWriteBuf())
+               {
+                       // if we're here the write() caused an error, we cannot proceed
+                       log(DEBUG,"cn->FlushWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str());
+                       log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str());
+                       cn->CloseConnection();
+                       cn->SetState(STATE_DISCONNECTED);
+                       WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str());
+                       // retry the packet along a new route so either arrival OR failure are gauranteed
+                       return this->SendPacket(message,sendhost);
+               }
                 return true;
         }
 }