]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
SendQ bugfixes
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2009 InspIRCd Development Team
6  * See: http://wiki.inspircd.org/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include "socket.h"
16 #include "inspstring.h"
17 #include "socketengine.h"
18
19 BufferedSocket::BufferedSocket()
20 {
21         Timeout = NULL;
22         state = I_ERROR;
23 }
24
25 BufferedSocket::BufferedSocket(int newfd)
26 {
27         Timeout = NULL;
28         this->fd = newfd;
29         this->state = I_CONNECTED;
30         if (fd > -1)
31                 ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
32 }
33
34 void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
35 {
36         BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
37         if (err != I_ERR_NONE)
38         {
39                 state = I_ERROR;
40                 SetError(strerror(errno));
41                 OnError(err);
42         }
43 }
44
45 BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
46 {
47         irc::sockets::sockaddrs addr, bind;
48         if (!irc::sockets::aptosa(ipaddr.c_str(), aport, &addr))
49         {
50                 ServerInstance->Logs->Log("SOCKET", DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
51                 return I_ERR_CONNECT;
52         }
53
54         bind.sa.sa_family = 0;
55         if (!connectbindip.empty())
56         {
57                 if (!irc::sockets::aptosa(connectbindip.c_str(), 0, &bind))
58                 {
59                         return I_ERR_BIND;
60                 }
61         }
62
63         return BeginConnect(addr, bind, maxtime);
64 }
65
66 static void IncreaseOSBuffers(int fd)
67 {
68         // attempt to increase socket sendq and recvq as high as its possible
69         int sendbuf = 32768;
70         int recvbuf = 32768;
71         setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char *)&sendbuf,sizeof(sendbuf));
72         setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char *)&recvbuf,sizeof(recvbuf));
73         // on failure, do nothing. I'm a little sick of people trying to interpret this message as a result of why their incorrect setups don't work.
74 }
75
76 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned long timeout)
77 {
78         if (fd < 0)
79                 fd = socket(dest.sa.sa_family, SOCK_STREAM, 0);
80
81         if (fd < 0)
82                 return I_ERR_SOCKET;
83
84         if (bind.sa.sa_family != 0)
85         {
86                 if (ServerInstance->SE->Bind(fd, &bind.sa, sa_size(bind)) < 0)
87                         return I_ERR_BIND;
88         }
89
90         ServerInstance->SE->NonBlocking(fd);
91
92         if (ServerInstance->SE->Connect(this, &dest.sa, sa_size(dest)) == -1)
93         {
94                 if (errno != EINPROGRESS)
95                         return I_ERR_CONNECT;
96         }
97
98         this->state = I_CONNECTING;
99
100         if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE))
101                 return I_ERR_NOMOREFDS;
102
103         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
104         ServerInstance->Timers->AddTimer(this->Timeout);
105
106         IncreaseOSBuffers(fd);
107
108         ServerInstance->Logs->Log("SOCKET", DEBUG,"BufferedSocket::DoConnect success");
109         return I_ERR_NONE;
110 }
111
112 void StreamSocket::Close()
113 {
114         /* Save this, so we dont lose it,
115          * otherise on failure, error messages
116          * might be inaccurate.
117          */
118         int save = errno;
119         if (this->fd > -1)
120         {
121                 if (IOHook)
122                 {
123                         try
124                         {
125                                 IOHook->OnStreamSocketClose(this);
126                         }
127                         catch (CoreException& modexcept)
128                         {
129                                 ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s",
130                                         modexcept.GetSource(), modexcept.GetReason());
131                         }
132                 }
133                 ServerInstance->SE->Shutdown(this, 2);
134                 ServerInstance->SE->DelFd(this);
135                 ServerInstance->SE->Close(this);
136                 fd = -1;
137         }
138         errno = save;
139 }
140
141 void StreamSocket::cull()
142 {
143         Close();
144 }
145
146 bool StreamSocket::GetNextLine(std::string& line, char delim)
147 {
148         std::string::size_type i = recvq.find(delim);
149         if (i == std::string::npos)
150                 return false;
151         line = recvq.substr(0, i - 1);
152         // TODO is this the most efficient way to split?
153         recvq = recvq.substr(i + 1);
154         return true;
155 }
156
157 void StreamSocket::DoRead()
158 {
159         if (IOHook)
160         {
161                 int rv = -1;
162                 try
163                 {
164                         rv = IOHook->OnStreamSocketRead(this, recvq);
165                 }
166                 catch (CoreException& modexcept)
167                 {
168                         ServerInstance->Logs->Log("SOCKET", DEFAULT, "%s threw an exception: %s",
169                                 modexcept.GetSource(), modexcept.GetReason());
170                         return;
171                 }
172                 if (rv > 0)
173                         OnDataReady();
174                 if (rv < 0)
175                         SetError("Read Error"); // will not overwrite a better error message
176         }
177         else
178         {
179                 char* ReadBuffer = ServerInstance->GetReadBuffer();
180                 int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
181                 if (n == ServerInstance->Config->NetBufferSize)
182                 {
183                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
184                         recvq.append(ReadBuffer, n);
185                         OnDataReady();
186                 }
187                 else if (n > 0)
188                 {
189                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
190                         recvq.append(ReadBuffer, n);
191                         OnDataReady();
192                 }
193                 else if (n == 0)
194                 {
195                         error = "Connection closed";
196                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
197                 }
198                 else if (errno == EAGAIN)
199                 {
200                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
201                 }
202                 else if (errno == EINTR)
203                 {
204                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
205                 }
206                 else
207                 {
208                         error = strerror(errno);
209                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
210                 }
211         }
212 }
213
214 void StreamSocket::DoWrite()
215 {
216         if (sendq.empty())
217                 return;
218         if (!error.empty() || fd < 0 || fd == INT_MAX)
219         {
220                 ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket");
221                 return;
222         }
223
224         if (IOHook)
225         {
226                 int rv = -1;
227                 try
228                 {
229                         if (sendq.size() > 1 && sendq[0].length() < 1024)
230                         {
231                                 // Avoid multiple repeated SSL encryption invocations
232                                 // This adds a single copy of the queue, but avoids
233                                 // much more overhead in terms of system calls invoked
234                                 // by the IOHook.
235                                 //
236                                 // The length limit of 1024 is to prevent merging strings
237                                 // more than once when writes begin to block.
238                                 std::string tmp;
239                                 tmp.reserve(sendq_len);
240                                 for(unsigned int i=0; i < sendq.size(); i++)
241                                         tmp.append(sendq[i]);
242                                 sendq.clear();
243                                 sendq.push_back(tmp);
244                         }
245                         while (!sendq.empty())
246                         {
247                                 std::string& front = sendq.front();
248                                 int itemlen = front.length();
249                                 rv = IOHook->OnStreamSocketWrite(this, front);
250                                 if (rv > 0)
251                                 {
252                                         // consumed the entire string, and is ready for more
253                                         sendq_len -= itemlen;
254                                         sendq.pop_front();
255                                 }
256                                 else if (rv == 0)
257                                 {
258                                         // socket has blocked. Stop trying to send data.
259                                         // IOHook has requested unblock notification from the socketengine
260
261                                         // Since it is possible that a partial write took place, adjust sendq_len
262                                         sendq_len = sendq_len - itemlen + front.length();
263                                         return;
264                                 }
265                                 else
266                                 {
267                                         SetError("Write Error"); // will not overwrite a better error message
268                                         return;
269                                 }
270                         }
271                 }
272                 catch (CoreException& modexcept)
273                 {
274                         ServerInstance->Logs->Log("SOCKET", DEBUG,"%s threw an exception: %s",
275                                 modexcept.GetSource(), modexcept.GetReason());
276                 }
277         }
278         else
279         {
280                 // don't even try if we are known to be blocking
281                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
282                         return;
283                 // start out optimistic - we won't need to write any more
284                 int eventChange = FD_WANT_EDGE_WRITE;
285                 while (sendq_len && eventChange == FD_WANT_EDGE_WRITE)
286                 {
287                         // Prepare a writev() call to write all buffers efficiently
288                         int bufcount = sendq.size();
289                 
290                         // cap the number of buffers at IOV_MAX
291                         if (bufcount > IOV_MAX)
292                         {
293                                 bufcount = IOV_MAX;
294                         }
295
296                         int rv_max = 0;
297                         iovec* iovecs = new iovec[bufcount];
298                         for(int i=0; i < bufcount; i++)
299                         {
300                                 iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
301                                 iovecs[i].iov_len = sendq[i].length();
302                                 rv_max += sendq[i].length();
303                         }
304                         int rv = writev(fd, iovecs, bufcount);
305                         delete[] iovecs;
306
307                         if (rv == (int)sendq_len)
308                         {
309                                 // it's our lucky day, everything got written out. Fast cleanup.
310                                 // This won't ever happen if the number of buffers got capped.
311                                 sendq_len = 0;
312                                 sendq.clear();
313                         }
314                         else if (rv > 0)
315                         {
316                                 // Partial write. Clean out strings from the sendq
317                                 sendq_len -= rv;
318                                 while (rv > 0 && !sendq.empty())
319                                 {
320                                         std::string& front = sendq.front();
321                                         if (front.length() <= (size_t)rv)
322                                         {
323                                                 // this string got fully written out
324                                                 rv -= front.length();
325                                                 sendq.pop_front();
326                                         }
327                                         else
328                                         {
329                                                 // stopped in the middle of this string
330                                                 front = front.substr(rv);
331                                                 rv = 0;
332                                         }
333                                 }
334                                 if (rv < rv_max)
335                                 {
336                                         // it's going to block now
337                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
338                                 }
339                         }
340                         else if (rv == 0)
341                         {
342                                 error = "Connection closed";
343                         }
344                         else if (errno == EAGAIN)
345                         {
346                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
347                         }
348                         else if (errno == EINTR)
349                         {
350                                 // restart interrupted syscall
351                         }
352                         else
353                         {
354                                 error = strerror(errno);
355                         }
356                 }
357                 if (!error.empty())
358                 {
359                         // error - kill all events
360                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
361                 }
362                 else
363                 {
364                         ServerInstance->SE->ChangeEventMask(this, eventChange);
365                 }
366         }
367 }
368
369 void StreamSocket::WriteData(const std::string &data)
370 {
371         if (fd < 0)
372         {
373                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Attempt to write data to dead socket: %s",
374                         data.c_str());
375                 return;
376         }
377
378         /* Append the data to the back of the queue ready for writing */
379         sendq.push_back(data);
380         sendq_len += data.length();
381
382         ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
383 }
384
385 void SocketTimeout::Tick(time_t)
386 {
387         ServerInstance->Logs->Log("SOCKET", DEBUG,"SocketTimeout::Tick");
388
389         if (ServerInstance->SE->GetRef(this->sfd) != this->sock)
390                 return;
391
392         if (this->sock->state == I_CONNECTING)
393         {
394                 // for connecting sockets, the timeout can occur
395                 // which causes termination of the connection after
396                 // the given number of seconds without a successful
397                 // connection.
398                 this->sock->OnTimeout();
399                 this->sock->OnError(I_ERR_TIMEOUT);
400
401                 /* NOTE: We must set this AFTER DelFd, as we added
402                  * this socket whilst writeable. This means that we
403                  * must DELETE the socket whilst writeable too!
404                  */
405                 this->sock->state = I_ERROR;
406
407                 ServerInstance->GlobalCulls.AddItem(sock);
408         }
409
410         this->sock->Timeout = NULL;
411 }
412
413 void BufferedSocket::OnConnected() { }
414 void BufferedSocket::OnTimeout() { return; }
415
416 void BufferedSocket::DoWrite()
417 {
418         if (state == I_CONNECTING)
419         {
420                 state = I_CONNECTED;
421                 this->OnConnected();
422                 if (GetIOHook())
423                         GetIOHook()->OnStreamSocketConnect(this);
424                 else
425                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
426         }
427         this->StreamSocket::DoWrite();
428 }
429
430 BufferedSocket::~BufferedSocket()
431 {
432         this->Close();
433         if (Timeout)
434         {
435                 ServerInstance->Timers->DelTimer(Timeout);
436                 Timeout = NULL;
437         }
438 }
439
440 void StreamSocket::HandleEvent(EventType et, int errornum)
441 {
442         BufferedSocketError errcode = I_ERR_OTHER;
443         switch (et)
444         {
445                 case EVENT_ERROR:
446                 {
447                         if (errornum == 0)
448                                 SetError("Connection closed");
449                         else
450                                 SetError(strerror(errornum));
451                         switch (errornum)
452                         {
453                                 case ETIMEDOUT:
454                                         errcode = I_ERR_TIMEOUT;
455                                         break;
456                                 case ECONNREFUSED:
457                                 case 0:
458                                         errcode = I_ERR_CONNECT;
459                                         break;
460                                 case EADDRINUSE:
461                                         errcode = I_ERR_BIND;
462                                         break;
463                                 case EPIPE:
464                                 case EIO:
465                                         errcode = I_ERR_WRITE;
466                                         break;
467                         }
468                         break;
469                 }
470                 case EVENT_READ:
471                 {
472                         DoRead();
473                         break;
474                 }
475                 case EVENT_WRITE:
476                 {
477                         DoWrite();
478                         break;
479                 }
480         }
481         if (!error.empty())
482         {
483                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
484                 OnError(errcode);
485                 ServerInstance->GlobalCulls.AddItem(this);
486         }
487 }
488