]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
c63748eab1a16e790f1983ad91509c9f95f40d03
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*       +------------------------------------+
2  *       | Inspire Internet Relay Chat Daemon |
3  *       +------------------------------------+
4  *
5  *  InspIRCd: (C) 2002-2009 InspIRCd Development Team
6  * See: http://wiki.inspircd.org/Credits
7  *
8  * This program is free but copyrighted software; see
9  *            the file COPYING for details.
10  *
11  * ---------------------------------------------------
12  */
13
14 #include "inspircd.h"
15 #include "socket.h"
16 #include "inspstring.h"
17 #include "socketengine.h"
18 #ifndef WINDOWS
19 #include <sys/uio.h>
20 #endif
21
22 BufferedSocket::BufferedSocket()
23 {
24         Timeout = NULL;
25         state = I_ERROR;
26 }
27
28 BufferedSocket::BufferedSocket(int newfd)
29 {
30         Timeout = NULL;
31         this->fd = newfd;
32         this->state = I_CONNECTED;
33         if (fd > -1)
34                 ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
35 }
36
37 void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
38 {
39         BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
40         if (err != I_ERR_NONE)
41         {
42                 state = I_ERROR;
43                 SetError(strerror(errno));
44                 OnError(err);
45         }
46 }
47
48 BufferedSocketError BufferedSocket::BeginConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
49 {
50         irc::sockets::sockaddrs addr, bind;
51         if (!irc::sockets::aptosa(ipaddr, aport, &addr))
52         {
53                 ServerInstance->Logs->Log("SOCKET", DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
54                 return I_ERR_CONNECT;
55         }
56
57         bind.sa.sa_family = 0;
58         if (!connectbindip.empty())
59         {
60                 if (!irc::sockets::aptosa(connectbindip, 0, &bind))
61                 {
62                         return I_ERR_BIND;
63                 }
64         }
65
66         return BeginConnect(addr, bind, maxtime);
67 }
68
69 static void IncreaseOSBuffers(int fd)
70 {
71         // attempt to increase socket sendq and recvq as high as its possible
72         int sendbuf = 32768;
73         int recvbuf = 32768;
74         setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char *)&sendbuf,sizeof(sendbuf));
75         setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char *)&recvbuf,sizeof(recvbuf));
76         // on failure, do nothing. I'm a little sick of people trying to interpret this message as a result of why their incorrect setups don't work.
77 }
78
79 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned long timeout)
80 {
81         if (fd < 0)
82                 fd = socket(dest.sa.sa_family, SOCK_STREAM, 0);
83
84         if (fd < 0)
85                 return I_ERR_SOCKET;
86
87         if (bind.sa.sa_family != 0)
88         {
89                 if (ServerInstance->SE->Bind(fd, &bind.sa, sa_size(bind)) < 0)
90                         return I_ERR_BIND;
91         }
92
93         ServerInstance->SE->NonBlocking(fd);
94
95         if (ServerInstance->SE->Connect(this, &dest.sa, sa_size(dest)) == -1)
96         {
97                 if (errno != EINPROGRESS)
98                         return I_ERR_CONNECT;
99         }
100
101         this->state = I_CONNECTING;
102
103         if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE))
104                 return I_ERR_NOMOREFDS;
105
106         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
107         ServerInstance->Timers->AddTimer(this->Timeout);
108
109         IncreaseOSBuffers(fd);
110
111         ServerInstance->Logs->Log("SOCKET", DEBUG,"BufferedSocket::DoConnect success");
112         return I_ERR_NONE;
113 }
114
115 void StreamSocket::Close()
116 {
117         /* Save this, so we dont lose it,
118          * otherise on failure, error messages
119          * might be inaccurate.
120          */
121         int save = errno;
122         if (this->fd > -1)
123         {
124                 if (IOHook)
125                 {
126                         try
127                         {
128                                 IOHook->OnStreamSocketClose(this);
129                         }
130                         catch (CoreException& modexcept)
131                         {
132                                 ServerInstance->Logs->Log("SOCKET", DEFAULT,"%s threw an exception: %s",
133                                         modexcept.GetSource(), modexcept.GetReason());
134                         }
135                 }
136                 ServerInstance->SE->Shutdown(this, 2);
137                 ServerInstance->SE->DelFd(this);
138                 ServerInstance->SE->Close(this);
139                 fd = -1;
140         }
141         errno = save;
142 }
143
144 bool StreamSocket::cull()
145 {
146         Close();
147         return true;
148 }
149
150 bool StreamSocket::GetNextLine(std::string& line, char delim)
151 {
152         std::string::size_type i = recvq.find(delim);
153         if (i == std::string::npos)
154                 return false;
155         line = recvq.substr(0, i - 1);
156         // TODO is this the most efficient way to split?
157         recvq = recvq.substr(i + 1);
158         return true;
159 }
160
161 void StreamSocket::DoRead()
162 {
163         if (IOHook)
164         {
165                 int rv = -1;
166                 try
167                 {
168                         rv = IOHook->OnStreamSocketRead(this, recvq);
169                 }
170                 catch (CoreException& modexcept)
171                 {
172                         ServerInstance->Logs->Log("SOCKET", DEFAULT, "%s threw an exception: %s",
173                                 modexcept.GetSource(), modexcept.GetReason());
174                         return;
175                 }
176                 if (rv > 0)
177                         OnDataReady();
178                 if (rv < 0)
179                         SetError("Read Error"); // will not overwrite a better error message
180         }
181         else
182         {
183                 char* ReadBuffer = ServerInstance->GetReadBuffer();
184                 int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
185                 if (n == ServerInstance->Config->NetBufferSize)
186                 {
187                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
188                         recvq.append(ReadBuffer, n);
189                         OnDataReady();
190                 }
191                 else if (n > 0)
192                 {
193                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
194                         recvq.append(ReadBuffer, n);
195                         OnDataReady();
196                 }
197                 else if (n == 0)
198                 {
199                         error = "Connection closed";
200                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
201                 }
202                 else if (errno == EAGAIN)
203                 {
204                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
205                 }
206                 else if (errno == EINTR)
207                 {
208                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
209                 }
210                 else
211                 {
212                         error = strerror(errno);
213                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
214                 }
215         }
216 }
217
218 void StreamSocket::DoWrite()
219 {
220         if (sendq.empty())
221                 return;
222         if (!error.empty() || fd < 0 || fd == INT_MAX)
223         {
224                 ServerInstance->Logs->Log("SOCKET", DEBUG, "DoWrite on errored or closed socket");
225                 return;
226         }
227
228         if (IOHook)
229         {
230                 int rv = -1;
231                 try
232                 {
233                         if (sendq.size() > 1 && sendq[0].length() < 1024)
234                         {
235                                 // Avoid multiple repeated SSL encryption invocations
236                                 // This adds a single copy of the queue, but avoids
237                                 // much more overhead in terms of system calls invoked
238                                 // by the IOHook.
239                                 //
240                                 // The length limit of 1024 is to prevent merging strings
241                                 // more than once when writes begin to block.
242                                 std::string tmp;
243                                 tmp.reserve(sendq_len);
244                                 for(unsigned int i=0; i < sendq.size(); i++)
245                                         tmp.append(sendq[i]);
246                                 sendq.clear();
247                                 sendq.push_back(tmp);
248                         }
249                         while (!sendq.empty())
250                         {
251                                 std::string& front = sendq.front();
252                                 int itemlen = front.length();
253                                 rv = IOHook->OnStreamSocketWrite(this, front);
254                                 if (rv > 0)
255                                 {
256                                         // consumed the entire string, and is ready for more
257                                         sendq_len -= itemlen;
258                                         sendq.pop_front();
259                                 }
260                                 else if (rv == 0)
261                                 {
262                                         // socket has blocked. Stop trying to send data.
263                                         // IOHook has requested unblock notification from the socketengine
264
265                                         // Since it is possible that a partial write took place, adjust sendq_len
266                                         sendq_len = sendq_len - itemlen + front.length();
267                                         return;
268                                 }
269                                 else
270                                 {
271                                         SetError("Write Error"); // will not overwrite a better error message
272                                         return;
273                                 }
274                         }
275                 }
276                 catch (CoreException& modexcept)
277                 {
278                         ServerInstance->Logs->Log("SOCKET", DEBUG,"%s threw an exception: %s",
279                                 modexcept.GetSource(), modexcept.GetReason());
280                 }
281         }
282         else
283         {
284                 // don't even try if we are known to be blocking
285                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
286                         return;
287                 // start out optimistic - we won't need to write any more
288                 int eventChange = FD_WANT_EDGE_WRITE;
289                 while (sendq_len && eventChange == FD_WANT_EDGE_WRITE)
290                 {
291                         // Prepare a writev() call to write all buffers efficiently
292                         int bufcount = sendq.size();
293                 
294                         // cap the number of buffers at IOV_MAX
295                         if (bufcount > IOV_MAX)
296                         {
297                                 bufcount = IOV_MAX;
298                         }
299
300                         int rv_max = 0;
301                         iovec* iovecs = new iovec[bufcount];
302                         for(int i=0; i < bufcount; i++)
303                         {
304                                 iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
305                                 iovecs[i].iov_len = sendq[i].length();
306                                 rv_max += sendq[i].length();
307                         }
308                         int rv = writev(fd, iovecs, bufcount);
309                         delete[] iovecs;
310
311                         if (rv == (int)sendq_len)
312                         {
313                                 // it's our lucky day, everything got written out. Fast cleanup.
314                                 // This won't ever happen if the number of buffers got capped.
315                                 sendq_len = 0;
316                                 sendq.clear();
317                         }
318                         else if (rv > 0)
319                         {
320                                 // Partial write. Clean out strings from the sendq
321                                 sendq_len -= rv;
322                                 while (rv > 0 && !sendq.empty())
323                                 {
324                                         std::string& front = sendq.front();
325                                         if (front.length() <= (size_t)rv)
326                                         {
327                                                 // this string got fully written out
328                                                 rv -= front.length();
329                                                 sendq.pop_front();
330                                         }
331                                         else
332                                         {
333                                                 // stopped in the middle of this string
334                                                 front = front.substr(rv);
335                                                 rv = 0;
336                                         }
337                                 }
338                                 if (rv < rv_max)
339                                 {
340                                         // it's going to block now
341                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
342                                 }
343                         }
344                         else if (rv == 0)
345                         {
346                                 error = "Connection closed";
347                         }
348                         else if (errno == EAGAIN)
349                         {
350                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
351                         }
352                         else if (errno == EINTR)
353                         {
354                                 // restart interrupted syscall
355                         }
356                         else
357                         {
358                                 error = strerror(errno);
359                         }
360                 }
361                 if (!error.empty())
362                 {
363                         // error - kill all events
364                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
365                 }
366                 else
367                 {
368                         ServerInstance->SE->ChangeEventMask(this, eventChange);
369                 }
370         }
371 }
372
373 void StreamSocket::WriteData(const std::string &data)
374 {
375         if (fd < 0)
376         {
377                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Attempt to write data to dead socket: %s",
378                         data.c_str());
379                 return;
380         }
381
382         /* Append the data to the back of the queue ready for writing */
383         sendq.push_back(data);
384         sendq_len += data.length();
385
386         ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
387 }
388
389 void SocketTimeout::Tick(time_t)
390 {
391         ServerInstance->Logs->Log("SOCKET", DEBUG,"SocketTimeout::Tick");
392
393         if (ServerInstance->SE->GetRef(this->sfd) != this->sock)
394                 return;
395
396         if (this->sock->state == I_CONNECTING)
397         {
398                 // for connecting sockets, the timeout can occur
399                 // which causes termination of the connection after
400                 // the given number of seconds without a successful
401                 // connection.
402                 this->sock->OnTimeout();
403                 this->sock->OnError(I_ERR_TIMEOUT);
404
405                 /* NOTE: We must set this AFTER DelFd, as we added
406                  * this socket whilst writeable. This means that we
407                  * must DELETE the socket whilst writeable too!
408                  */
409                 this->sock->state = I_ERROR;
410
411                 ServerInstance->GlobalCulls.AddItem(sock);
412         }
413
414         this->sock->Timeout = NULL;
415 }
416
417 void BufferedSocket::OnConnected() { }
418 void BufferedSocket::OnTimeout() { return; }
419
420 void BufferedSocket::DoWrite()
421 {
422         if (state == I_CONNECTING)
423         {
424                 state = I_CONNECTED;
425                 this->OnConnected();
426                 if (GetIOHook())
427                         GetIOHook()->OnStreamSocketConnect(this);
428                 else
429                         ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
430         }
431         this->StreamSocket::DoWrite();
432 }
433
434 BufferedSocket::~BufferedSocket()
435 {
436         this->Close();
437         if (Timeout)
438         {
439                 ServerInstance->Timers->DelTimer(Timeout);
440                 Timeout = NULL;
441         }
442 }
443
444 void StreamSocket::HandleEvent(EventType et, int errornum)
445 {
446         if (!error.empty())
447                 return;
448         BufferedSocketError errcode = I_ERR_OTHER;
449         switch (et)
450         {
451                 case EVENT_ERROR:
452                 {
453                         if (errornum == 0)
454                                 SetError("Connection closed");
455                         else
456                                 SetError(strerror(errornum));
457                         switch (errornum)
458                         {
459                                 case ETIMEDOUT:
460                                         errcode = I_ERR_TIMEOUT;
461                                         break;
462                                 case ECONNREFUSED:
463                                 case 0:
464                                         errcode = I_ERR_CONNECT;
465                                         break;
466                                 case EADDRINUSE:
467                                         errcode = I_ERR_BIND;
468                                         break;
469                                 case EPIPE:
470                                 case EIO:
471                                         errcode = I_ERR_WRITE;
472                                         break;
473                         }
474                         break;
475                 }
476                 case EVENT_READ:
477                 {
478                         DoRead();
479                         break;
480                 }
481                 case EVENT_WRITE:
482                 {
483                         DoWrite();
484                         break;
485                 }
486         }
487         if (!error.empty())
488         {
489                 ServerInstance->Logs->Log("SOCKET", DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
490                 OnError(errcode);
491                 ServerInstance->GlobalCulls.AddItem(this);
492         }
493 }
494