]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
73f469a0513dabe232884c0104f8e7055aa4d732
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2009 InspIRCd Development Team
6  * See: http://wiki.inspircd.org/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include "socket.h"
16 #include "inspstring.h"
17 #include "socketengine.h"
18 #include <sys/uio.h>
19
20 BufferedSocket::BufferedSocket()
21 {
22         Timeout = NULL;
23         state = I_ERROR;
24 }
25
26 BufferedSocket::BufferedSocket(int newfd)
27 {
28         Timeout = NULL;
29         this->fd = newfd;
30         this->state = I_CONNECTED;
31         if (fd > -1)
32                 ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
33 }
34
35 void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
36 {
37         BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
38         if (err != I_ERR_NONE)
39         {
40                 state = I_ERROR;
41                 SetError(strerror(errno));
42                 OnError(err);
43         }
44 }
45
46 BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
47 {
48         irc::sockets::sockaddrs addr, bind;
49         if (!irc::sockets::aptosa(ipaddr, aport, &addr))
50         {
51                 ServerInstance->Logs->Log("SOCKET", DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
52                 return I_ERR_CONNECT;
53         }
54
55         bind.sa.sa_family = 0;
56         if (!connectbindip.empty())
57         {
58                 if (!irc::sockets::aptosa(connectbindip, 0, &bind))
59                 {
60                         return I_ERR_BIND;
61                 }
62         }
63
64         return BeginConnect(addr, bind, maxtime);
65 }
66
67 static void IncreaseOSBuffers(int fd)
68 {
69         // attempt to increase socket sendq and recvq as high as its possible
70         int sendbuf = 32768;
71         int recvbuf = 32768;
72         setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char *)&sendbuf,sizeof(sendbuf));
73         setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char *)&recvbuf,sizeof(recvbuf));
74         // on failure, do nothing. I'm a little sick of people trying to interpret this message as a result of why their incorrect setups don't work.
75 }
76
77 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned long timeout)
78 {
79         if (fd < 0)
80                 fd = socket(dest.sa.sa_family, SOCK_STREAM, 0);
81
82         if (fd < 0)
83                 return I_ERR_SOCKET;
84
85         if (bind.sa.sa_family != 0)
86         {
87                 if (ServerInstance->SE->Bind(fd, &bind.sa, sa_size(bind)) < 0)
88                         return I_ERR_BIND;
89         }
90
91         ServerInstance->SE->NonBlocking(fd);
92
93         if (ServerInstance->SE->Connect(this, &dest.sa, sa_size(dest)) == -1)
94         {
95                 if (errno != EINPROGRESS)
96                         return I_ERR_CONNECT;
97         }
98
99         this->state = I_CONNECTING;
100
101         if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE))
102                 return I_ERR_NOMOREFDS;
103
104         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
105         ServerInstance->Timers->AddTimer(this->Timeout);
106
107         IncreaseOSBuffers(fd);
108
109         ServerInstance->Logs->Log("SOCKET", DEBUG,"BufferedSocket::DoConnect success");
110         return I_ERR_NONE;
111 }
112
113 void StreamSocket::Close()
114 {
115         /* Save this, so we dont lose it,
116          * otherise on failure, error messages
117          * might be inaccurate.
118          */
119         int save = errno;
120         if (this->fd > -1)
121         {
122                 if (IOHook)
123                 {
124                         try
125                         {
126                                 IOHook->OnStreamSocketClose(this);
127                         }
128                         catch (CoreException& modexcept)
129                         {
130                                 ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s",
131                                         modexcept.GetSource(), modexcept.GetReason());
132                         }
133                 }
134                 ServerInstance->SE->Shutdown(this, 2);
135                 ServerInstance->SE->DelFd(this);
136                 ServerInstance->SE->Close(this);
137                 fd = -1;
138         }
139         errno = save;
140 }
141
142 bool StreamSocket::cull()
143 {
144         Close();
145         return true;
146 }
147
148 bool StreamSocket::GetNextLine(std::string& line, char delim)
149 {
150         std::string::size_type i = recvq.find(delim);
151         if (i == std::string::npos)
152                 return false;
153         line = recvq.substr(0, i - 1);
154         // TODO is this the most efficient way to split?
155         recvq = recvq.substr(i + 1);
156         return true;
157 }
158
159 void StreamSocket::DoRead()
160 {
161         if (IOHook)
162         {
163                 int rv = -1;
164                 try
165                 {
166                         rv = IOHook->OnStreamSocketRead(this, recvq);
167                 }
168                 catch (CoreException& modexcept)
169                 {
170                         ServerInstance->Logs->Log("SOCKET", DEFAULT, "%s threw an exception: %s",
171                                 modexcept.GetSource(), modexcept.GetReason());
172                         return;
173                 }
174                 if (rv > 0)
175                         OnDataReady();
176                 if (rv < 0)
177                         SetError("Read Error"); // will not overwrite a better error message
178         }
179         else
180         {
181                 char* ReadBuffer = ServerInstance->GetReadBuffer();
182                 int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
183                 if (n == ServerInstance->Config->NetBufferSize)
184                 {
185                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
186                         recvq.append(ReadBuffer, n);
187                         OnDataReady();
188                 }
189                 else if (n > 0)
190                 {
191                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
192                         recvq.append(ReadBuffer, n);
193                         OnDataReady();
194                 }
195                 else if (n == 0)
196                 {
197                         error = "Connection closed";
198                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
199                 }
200                 else if (errno == EAGAIN)
201                 {
202                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
203                 }
204                 else if (errno == EINTR)
205                 {
206                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
207                 }
208                 else
209                 {
210                         error = strerror(errno);
211                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
212                 }
213         }
214 }
215
216 void StreamSocket::DoWrite()
217 {
218         if (sendq.empty())
219                 return;
220         if (!error.empty() || fd < 0 || fd == INT_MAX)
221         {
222                 ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket");
223                 return;
224         }
225
226         if (IOHook)
227         {
228                 int rv = -1;
229                 try
230                 {
231                         if (sendq.size() > 1 && sendq[0].length() < 1024)
232                         {
233                                 // Avoid multiple repeated SSL encryption invocations
234                                 // This adds a single copy of the queue, but avoids
235                                 // much more overhead in terms of system calls invoked
236                                 // by the IOHook.
237                                 //
238                                 // The length limit of 1024 is to prevent merging strings
239                                 // more than once when writes begin to block.
240                                 std::string tmp;
241                                 tmp.reserve(sendq_len);
242                                 for(unsigned int i=0; i < sendq.size(); i++)
243                                         tmp.append(sendq[i]);
244                                 sendq.clear();
245                                 sendq.push_back(tmp);
246                         }
247                         while (!sendq.empty())
248                         {
249                                 std::string& front = sendq.front();
250                                 int itemlen = front.length();
251                                 rv = IOHook->OnStreamSocketWrite(this, front);
252                                 if (rv > 0)
253                                 {
254                                         // consumed the entire string, and is ready for more
255                                         sendq_len -= itemlen;
256                                         sendq.pop_front();
257                                 }
258                                 else if (rv == 0)
259                                 {
260                                         // socket has blocked. Stop trying to send data.
261                                         // IOHook has requested unblock notification from the socketengine
262
263                                         // Since it is possible that a partial write took place, adjust sendq_len
264                                         sendq_len = sendq_len - itemlen + front.length();
265                                         return;
266                                 }
267                                 else
268                                 {
269                                         SetError("Write Error"); // will not overwrite a better error message
270                                         return;
271                                 }
272                         }
273                 }
274                 catch (CoreException& modexcept)
275                 {
276                         ServerInstance->Logs->Log("SOCKET", DEBUG,"%s threw an exception: %s",
277                                 modexcept.GetSource(), modexcept.GetReason());
278                 }
279         }
280         else
281         {
282                 // don't even try if we are known to be blocking
283                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
284                         return;
285                 // start out optimistic - we won't need to write any more
286                 int eventChange = FD_WANT_EDGE_WRITE;
287                 while (sendq_len && eventChange == FD_WANT_EDGE_WRITE)
288                 {
289                         // Prepare a writev() call to write all buffers efficiently
290                         int bufcount = sendq.size();
291                 
292                         // cap the number of buffers at IOV_MAX
293                         if (bufcount > IOV_MAX)
294                         {
295                                 bufcount = IOV_MAX;
296                         }
297
298                         int rv_max = 0;
299                         iovec* iovecs = new iovec[bufcount];
300                         for(int i=0; i < bufcount; i++)
301                         {
302                                 iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
303                                 iovecs[i].iov_len = sendq[i].length();
304                                 rv_max += sendq[i].length();
305                         }
306                         int rv = writev(fd, iovecs, bufcount);
307                         delete[] iovecs;
308
309                         if (rv == (int)sendq_len)
310                         {
311                                 // it's our lucky day, everything got written out. Fast cleanup.
312                                 // This won't ever happen if the number of buffers got capped.
313                                 sendq_len = 0;
314                                 sendq.clear();
315                         }
316                         else if (rv > 0)
317                         {
318                                 // Partial write. Clean out strings from the sendq
319                                 sendq_len -= rv;
320                                 while (rv > 0 && !sendq.empty())
321                                 {
322                                         std::string& front = sendq.front();
323                                         if (front.length() <= (size_t)rv)
324                                         {
325                                                 // this string got fully written out
326                                                 rv -= front.length();
327                                                 sendq.pop_front();
328                                         }
329                                         else
330                                         {
331                                                 // stopped in the middle of this string
332                                                 front = front.substr(rv);
333                                                 rv = 0;
334                                         }
335                                 }
336                                 if (rv < rv_max)
337                                 {
338                                         // it's going to block now
339                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
340                                 }
341                         }
342                         else if (rv == 0)
343                         {
344                                 error = "Connection closed";
345                         }
346                         else if (errno == EAGAIN)
347                         {
348                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
349                         }
350                         else if (errno == EINTR)
351                         {
352                                 // restart interrupted syscall
353                         }
354                         else
355                         {
356                                 error = strerror(errno);
357                         }
358                 }
359                 if (!error.empty())
360                 {
361                         // error - kill all events
362                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
363                 }
364                 else
365                 {
366                         ServerInstance->SE->ChangeEventMask(this, eventChange);
367                 }
368         }
369 }
370
371 void StreamSocket::WriteData(const std::string &data)
372 {
373         if (fd < 0)
374         {
375                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Attempt to write data to dead socket: %s",
376                         data.c_str());
377                 return;
378         }
379
380         /* Append the data to the back of the queue ready for writing */
381         sendq.push_back(data);
382         sendq_len += data.length();
383
384         ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
385 }
386
387 void SocketTimeout::Tick(time_t)
388 {
389         ServerInstance->Logs->Log("SOCKET", DEBUG,"SocketTimeout::Tick");
390
391         if (ServerInstance->SE->GetRef(this->sfd) != this->sock)
392                 return;
393
394         if (this->sock->state == I_CONNECTING)
395         {
396                 // for connecting sockets, the timeout can occur
397                 // which causes termination of the connection after
398                 // the given number of seconds without a successful
399                 // connection.
400                 this->sock->OnTimeout();
401                 this->sock->OnError(I_ERR_TIMEOUT);
402
403                 /* NOTE: We must set this AFTER DelFd, as we added
404                  * this socket whilst writeable. This means that we
405                  * must DELETE the socket whilst writeable too!
406                  */
407                 this->sock->state = I_ERROR;
408
409                 ServerInstance->GlobalCulls.AddItem(sock);
410         }
411
412         this->sock->Timeout = NULL;
413 }
414
415 void BufferedSocket::OnConnected() { }
416 void BufferedSocket::OnTimeout() { return; }
417
418 void BufferedSocket::DoWrite()
419 {
420         if (state == I_CONNECTING)
421         {
422                 state = I_CONNECTED;
423                 this->OnConnected();
424                 if (GetIOHook())
425                         GetIOHook()->OnStreamSocketConnect(this);
426                 else
427                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
428         }
429         this->StreamSocket::DoWrite();
430 }
431
432 BufferedSocket::~BufferedSocket()
433 {
434         this->Close();
435         if (Timeout)
436         {
437                 ServerInstance->Timers->DelTimer(Timeout);
438                 Timeout = NULL;
439         }
440 }
441
442 void StreamSocket::HandleEvent(EventType et, int errornum)
443 {
444         if (!error.empty())
445                 return;
446         BufferedSocketError errcode = I_ERR_OTHER;
447         switch (et)
448         {
449                 case EVENT_ERROR:
450                 {
451                         if (errornum == 0)
452                                 SetError("Connection closed");
453                         else
454                                 SetError(strerror(errornum));
455                         switch (errornum)
456                         {
457                                 case ETIMEDOUT:
458                                         errcode = I_ERR_TIMEOUT;
459                                         break;
460                                 case ECONNREFUSED:
461                                 case 0:
462                                         errcode = I_ERR_CONNECT;
463                                         break;
464                                 case EADDRINUSE:
465                                         errcode = I_ERR_BIND;
466                                         break;
467                                 case EPIPE:
468                                 case EIO:
469                                         errcode = I_ERR_WRITE;
470                                         break;
471                         }
472                         break;
473                 }
474                 case EVENT_READ:
475                 {
476                         DoRead();
477                         break;
478                 }
479                 case EVENT_WRITE:
480                 {
481                         DoWrite();
482                         break;
483                 }
484         }
485         if (!error.empty())
486         {
487                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
488                 OnError(errcode);
489                 ServerInstance->GlobalCulls.AddItem(this);
490         }
491 }
492