this->fd = newfd;
this->state = I_CONNECTED;
if (fd > -1)
- ServerInstance->SE->AddFd(this);
+ ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
}
void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
this->state = I_CONNECTING;
- if (!ServerInstance->SE->AddFd(this, true))
+ if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE))
return I_ERR_NOMOREFDS;
this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
{
char* ReadBuffer = ServerInstance->GetReadBuffer();
int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
- if (n > 0)
+ if (n == ServerInstance->Config->NetBufferSize)
{
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+ recvq.append(ReadBuffer, n);
+ OnDataReady();
+ }
+ else if (n > 0)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
recvq.append(ReadBuffer, n);
OnDataReady();
}
else if (n == 0)
{
error = "Connection closed";
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+ }
+ else if (errno == EAGAIN)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
}
- else if (errno != EAGAIN && errno != EINTR)
+ else if (errno == EINTR)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+ }
+ else
{
error = strerror(errno);
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
}
}
}
{
if (sendq.empty())
return;
+ if (!error.empty() || fd < 0 || fd == INT_MAX)
+ {
+ ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket");
+ return;
+ }
if (IOHook)
{
}
else
{
- // Prepare a writev() call to write all buffers efficiently
- int bufcount = sendq.size();
+ // don't even try if we are known to be blocking
+ if (GetEventMask() & FD_WRITE_WILL_BLOCK)
+ return;
+ // start out optimistic - we won't need to write any more
+ int eventChange = FD_WANT_EDGE_WRITE;
+ while (sendq_len && eventChange == FD_WANT_EDGE_WRITE)
+ {
+ // Prepare a writev() call to write all buffers efficiently
+ int bufcount = sendq.size();
- // cap the number of buffers at IOV_MAX
- if (bufcount > IOV_MAX)
- bufcount = IOV_MAX;
+ // cap the number of buffers at IOV_MAX
+ if (bufcount > IOV_MAX)
+ {
+ bufcount = IOV_MAX;
+ }
- iovec* iovecs = new iovec[bufcount];
- for(int i=0; i < bufcount; i++)
- {
- iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
- iovecs[i].iov_len = sendq[i].length();
- }
- int rv = writev(fd, iovecs, bufcount);
- delete[] iovecs;
- if (rv == (int)sendq_len)
- {
- // it's our lucky day, everything got written out. Fast cleanup.
- sendq_len = 0;
- sendq.clear();
- }
- else if (rv > 0)
- {
- // Partial write. Clean out strings from the sendq
- sendq_len -= rv;
- while (rv > 0 && !sendq.empty())
+ int rv_max = 0;
+ iovec* iovecs = new iovec[bufcount];
+ for(int i=0; i < bufcount; i++)
{
- std::string& front = sendq.front();
- if (front.length() < (size_t)rv)
+ iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
+ iovecs[i].iov_len = sendq[i].length();
+ rv_max += sendq[i].length();
+ }
+ int rv = writev(fd, iovecs, bufcount);
+ delete[] iovecs;
+
+ if (rv == (int)sendq_len)
+ {
+ // it's our lucky day, everything got written out. Fast cleanup.
+ // This won't ever happen if the number of buffers got capped.
+ sendq_len = 0;
+ sendq.clear();
+ }
+ else if (rv > 0)
+ {
+ // Partial write. Clean out strings from the sendq
+ sendq_len -= rv;
+ while (rv > 0 && !sendq.empty())
{
- // this string got fully written out
- rv -= front.length();
- sendq.pop_front();
+ std::string& front = sendq.front();
+ if (front.length() <= (size_t)rv)
+ {
+ // this string got fully written out
+ rv -= front.length();
+ sendq.pop_front();
+ }
+ else
+ {
+ // stopped in the middle of this string
+ front = front.substr(rv);
+ rv = 0;
+ }
}
- else
+ if (rv < rv_max)
{
- // stopped in the middle of this string
- front = front.substr(rv);
- rv = 0;
+ // it's going to block now
+ eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
}
}
+ else if (rv == 0)
+ {
+ error = "Connection closed";
+ }
+ else if (errno == EAGAIN)
+ {
+ eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
+ }
+ else if (errno == EINTR)
+ {
+ // restart interrupted syscall
+ }
+ else
+ {
+ error = strerror(errno);
+ }
}
- else if (rv == 0)
+ if (!error.empty())
{
- error = "Connection closed";
+ // error - kill all events
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
}
- else if (errno != EAGAIN && errno != EINTR)
+ else
{
- error = strerror(errno);
+ ServerInstance->SE->ChangeEventMask(this, eventChange);
}
- if (sendq_len && error.empty())
- ServerInstance->SE->WantWrite(this);
}
}
data.c_str());
return;
}
- bool newWrite = sendq.empty() && !data.empty();
/* Append the data to the back of the queue ready for writing */
sendq.push_back(data);
sendq_len += data.length();
- if (newWrite)
- {
- // TODO perhaps we should try writing first, before asking SE about writes?
- // DoWrite();
- ServerInstance->SE->WantWrite(this);
- }
+ ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
}
void SocketTimeout::Tick(time_t)
this->OnConnected();
if (GetIOHook())
GetIOHook()->OnStreamSocketConnect(this);
+ else
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
}
this->StreamSocket::DoWrite();
}
{
case EVENT_ERROR:
{
- SetError(strerror(errornum));
+ if (errornum == 0)
+ SetError("Connection closed");
+ else
+ SetError(strerror(errornum));
switch (errornum)
{
case ETIMEDOUT: