#include "inspstring.h"
#include "socketengine.h"
+#ifndef DISABLE_WRITEV
+#include <sys/uio.h>
+#ifndef IOV_MAX
+#define IOV_MAX 1024
+#endif
+#endif
+
BufferedSocket::BufferedSocket()
{
Timeout = NULL;
BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
{
irc::sockets::sockaddrs addr, bind;
- if (!irc::sockets::aptosa(ipaddr.c_str(), aport, &addr))
+ if (!irc::sockets::aptosa(ipaddr, aport, &addr))
{
ServerInstance->Logs->Log("SOCKET", DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
return I_ERR_CONNECT;
bind.sa.sa_family = 0;
if (!connectbindip.empty())
{
- if (!irc::sockets::aptosa(connectbindip.c_str(), 0, &bind))
+ if (!irc::sockets::aptosa(connectbindip, 0, &bind))
{
return I_ERR_BIND;
}
errno = save;
}
-void StreamSocket::cull()
+CullResult StreamSocket::cull()
{
Close();
+ return EventHandler::cull();
}
bool StreamSocket::GetNextLine(std::string& line, char delim)
{
if (sendq.empty())
return;
+ if (!error.empty() || fd < 0 || fd == INT_MAX)
+ {
+ ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket");
+ return;
+ }
+#ifndef DISABLE_WRITEV
if (IOHook)
+#endif
{
int rv = -1;
try
{
- if (sendq.size() > 1 && sendq[0].length() < 1024)
- {
- // Avoid multiple repeated SSL encryption invocations
- // This adds a single copy of the queue, but avoids
- // much more overhead in terms of system calls invoked
- // by the IOHook.
- //
- // The length limit of 1024 is to prevent merging strings
- // more than once when writes begin to block.
- std::string tmp;
- tmp.reserve(sendq_len);
- for(unsigned int i=0; i < sendq.size(); i++)
- tmp.append(sendq[i]);
- sendq.clear();
- sendq.push_back(tmp);
- }
while (!sendq.empty())
{
- std::string& front = sendq.front();
- int itemlen = front.length();
- rv = IOHook->OnStreamSocketWrite(this, front);
- if (rv > 0)
+ if (sendq.size() > 1 && sendq[0].length() < 1024)
{
- // consumed the entire string, and is ready for more
- sendq_len -= itemlen;
- sendq.pop_front();
+ // Avoid multiple repeated SSL encryption invocations
+ // This adds a single copy of the queue, but avoids
+ // much more overhead in terms of system calls invoked
+ // by the IOHook.
+ //
+ // The length limit of 1024 is to prevent merging strings
+ // more than once when writes begin to block.
+ std::string tmp;
+ tmp.reserve(sendq_len);
+ for(unsigned int i=0; i < sendq.size(); i++)
+ tmp.append(sendq[i]);
+ sendq.clear();
+ sendq.push_back(tmp);
}
- else if (rv == 0)
+ std::string& front = sendq.front();
+ int itemlen = front.length();
+ if (IOHook)
{
- // socket has blocked. Stop trying to send data.
- // IOHook has requested unblock notification from the socketengine
+ rv = IOHook->OnStreamSocketWrite(this, front);
+ if (rv > 0)
+ {
+ // consumed the entire string, and is ready for more
+ sendq_len -= itemlen;
+ sendq.pop_front();
+ }
+ else if (rv == 0)
+ {
+ // socket has blocked. Stop trying to send data.
+ // IOHook has requested unblock notification from the socketengine
- // Since it is possible that a partial write took place, adjust sendq_len
- sendq_len = sendq_len - itemlen + front.length();
- return;
+ // Since it is possible that a partial write took place, adjust sendq_len
+ sendq_len = sendq_len - itemlen + front.length();
+ return;
+ }
+ else
+ {
+ SetError("Write Error"); // will not overwrite a better error message
+ return;
+ }
}
+#ifdef DISABLE_WRITEV
else
{
- SetError("Write Error"); // will not overwrite a better error message
- return;
+ rv = ServerInstance->SE->Send(this, front.data(), itemlen, 0);
+ if (rv == 0)
+ {
+ SetError("Connection closed");
+ return;
+ }
+ else if (rv < 0)
+ {
+ if (errno == EAGAIN || errno == EINTR)
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
+ else
+ SetError(strerror(errno));
+ return;
+ }
+ else if (rv < itemlen)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
+ front = front.substr(itemlen - rv);
+ sendq_len -= rv;
+ return;
+ }
+ else
+ {
+ sendq_len -= itemlen;
+ sendq.pop_front();
+ if (sendq.empty())
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_EDGE_WRITE);
+ }
}
+#endif
}
}
catch (CoreException& modexcept)
modexcept.GetSource(), modexcept.GetReason());
}
}
+#ifndef DISABLE_WRITEV
else
{
- bool again = true;
- while (again)
+ // don't even try if we are known to be blocking
+ if (GetEventMask() & FD_WRITE_WILL_BLOCK)
+ return;
+ // start out optimistic - we won't need to write any more
+ int eventChange = FD_WANT_EDGE_WRITE;
+ while (sendq_len && eventChange == FD_WANT_EDGE_WRITE)
{
- again = false;
-
// Prepare a writev() call to write all buffers efficiently
int bufcount = sendq.size();
if (bufcount > IOV_MAX)
{
bufcount = IOV_MAX;
- again = true;
}
+ int rv_max = 0;
iovec* iovecs = new iovec[bufcount];
for(int i=0; i < bufcount; i++)
{
iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
iovecs[i].iov_len = sendq[i].length();
+ rv_max += sendq[i].length();
}
int rv = writev(fd, iovecs, bufcount);
delete[] iovecs;
else if (rv > 0)
{
// Partial write. Clean out strings from the sendq
+ if (rv < rv_max)
+ {
+ // it's going to block now
+ eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
+ }
sendq_len -= rv;
while (rv > 0 && !sendq.empty())
{
std::string& front = sendq.front();
- if (front.length() < (size_t)rv)
+ if (front.length() <= (size_t)rv)
{
// this string got fully written out
rv -= front.length();
}
else if (errno == EAGAIN)
{
- again = false;
+ eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
}
else if (errno == EINTR)
{
- again = true;
+ // restart interrupted syscall
}
else
{
// error - kill all events
ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
}
- else if (sendq_len)
- {
- // writes have blocked, we can use FAST_WRITE to find when they unblock
- ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
- }
else
{
- // writes are done, we can use EDGE_WRITE to stop asking for write
- ServerInstance->SE->ChangeEventMask(this, FD_WANT_EDGE_WRITE);
+ ServerInstance->SE->ChangeEventMask(this, eventChange);
}
}
+#endif
}
void StreamSocket::WriteData(const std::string &data)
void StreamSocket::HandleEvent(EventType et, int errornum)
{
+ if (!error.empty())
+ return;
BufferedSocketError errcode = I_ERR_OTHER;
switch (et)
{
case EVENT_ERROR:
{
- SetError(strerror(errornum));
+ if (errornum == 0)
+ SetError("Connection closed");
+ else
+ SetError(strerror(errornum));
switch (errornum)
{
case ETIMEDOUT: