]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
Merge branch 'insp20' into master.
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2007-2009 Robin Burchell <robin+git@viroteck.net>
6  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
7  *   Copyright (C) 2006-2007 Craig Edwards <craigedwards@brainbox.cc>
8  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include "iohook.h"
27
28 static IOHook* GetNextHook(IOHook* hook)
29 {
30         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
31         if (iohm)
32                 return iohm->GetNextHook();
33         return NULL;
34 }
35
36 BufferedSocket::BufferedSocket()
37 {
38         Timeout = NULL;
39         state = I_ERROR;
40 }
41
42 BufferedSocket::BufferedSocket(int newfd)
43 {
44         Timeout = NULL;
45         this->fd = newfd;
46         this->state = I_CONNECTED;
47         if (fd > -1)
48                 SocketEngine::AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
49 }
50
51 void BufferedSocket::DoConnect(const std::string& ipaddr, int aport, unsigned int maxtime, const std::string& connectbindip)
52 {
53         BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
54         if (err != I_ERR_NONE)
55         {
56                 state = I_ERROR;
57                 SetError(SocketEngine::LastError());
58                 OnError(err);
59         }
60 }
61
62 BufferedSocketError BufferedSocket::BeginConnect(const std::string& ipaddr, int aport, unsigned int maxtime, const std::string& connectbindip)
63 {
64         irc::sockets::sockaddrs addr, bind;
65         if (!irc::sockets::aptosa(ipaddr, aport, addr))
66         {
67                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
68                 return I_ERR_CONNECT;
69         }
70
71         bind.sa.sa_family = 0;
72         if (!connectbindip.empty())
73         {
74                 if (!irc::sockets::aptosa(connectbindip, 0, bind))
75                 {
76                         return I_ERR_BIND;
77                 }
78         }
79
80         return BeginConnect(addr, bind, maxtime);
81 }
82
83 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int timeout)
84 {
85         if (fd < 0)
86                 fd = socket(dest.family(), SOCK_STREAM, 0);
87
88         if (fd < 0)
89                 return I_ERR_SOCKET;
90
91         if (bind.family() != 0)
92         {
93                 if (SocketEngine::Bind(fd, bind) < 0)
94                         return I_ERR_BIND;
95         }
96
97         SocketEngine::NonBlocking(fd);
98
99         if (SocketEngine::Connect(this, dest) == -1)
100         {
101                 if (errno != EINPROGRESS)
102                         return I_ERR_CONNECT;
103         }
104
105         this->state = I_CONNECTING;
106
107         if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
108                 return I_ERR_NOMOREFDS;
109
110         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout);
111         ServerInstance->Timers.AddTimer(this->Timeout);
112
113         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success");
114         return I_ERR_NONE;
115 }
116
117 void StreamSocket::Close()
118 {
119         if (this->fd > -1)
120         {
121                 // final chance, dump as much of the sendq as we can
122                 DoWrite();
123
124                 IOHook* hook = GetIOHook();
125                 DelIOHook();
126                 while (hook)
127                 {
128                         hook->OnStreamSocketClose(this);
129                         IOHook* const nexthook = GetNextHook(hook);
130                         delete hook;
131                         hook = nexthook;
132                 }
133                 SocketEngine::Shutdown(this, 2);
134                 SocketEngine::Close(this);
135         }
136 }
137
138 CullResult StreamSocket::cull()
139 {
140         Close();
141         return EventHandler::cull();
142 }
143
144 bool StreamSocket::GetNextLine(std::string& line, char delim)
145 {
146         std::string::size_type i = recvq.find(delim);
147         if (i == std::string::npos)
148                 return false;
149         line.assign(recvq, 0, i);
150         recvq.erase(0, i + 1);
151         return true;
152 }
153
154 int StreamSocket::HookChainRead(IOHook* hook, std::string& rq)
155 {
156         if (!hook)
157                 return ReadToRecvQ(rq);
158
159         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
160         if (iohm)
161         {
162                 // Call the next hook to put data into the recvq of the current hook
163                 const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ());
164                 if (ret <= 0)
165                         return ret;
166         }
167         return hook->OnStreamSocketRead(this, rq);
168 }
169
170 void StreamSocket::DoRead()
171 {
172         const std::string::size_type prevrecvqsize = recvq.size();
173
174         const int result = HookChainRead(GetIOHook(), recvq);
175         if (result < 0)
176         {
177                 SetError("Read Error"); // will not overwrite a better error message
178                 return;
179         }
180
181         if (recvq.size() > prevrecvqsize)
182                 OnDataReady();
183 }
184
185 int StreamSocket::ReadToRecvQ(std::string& rq)
186 {
187                 char* ReadBuffer = ServerInstance->GetReadBuffer();
188                 int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
189                 if (n == ServerInstance->Config->NetBufferSize)
190                 {
191                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
192                         rq.append(ReadBuffer, n);
193                 }
194                 else if (n > 0)
195                 {
196                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ);
197                         rq.append(ReadBuffer, n);
198                 }
199                 else if (n == 0)
200                 {
201                         error = "Connection closed";
202                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
203                         return -1;
204                 }
205                 else if (SocketEngine::IgnoreError())
206                 {
207                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
208                         return 0;
209                 }
210                 else if (errno == EINTR)
211                 {
212                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
213                         return 0;
214                 }
215                 else
216                 {
217                         error = SocketEngine::LastError();
218                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
219                         return -1;
220                 }
221         return n;
222 }
223
224 /* Don't try to prepare huge blobs of data to send to a blocked socket */
225 static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128;
226
227 void StreamSocket::DoWrite()
228 {
229         if (getSendQSize() == 0)
230                 return;
231         if (!error.empty() || fd < 0)
232         {
233                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket");
234                 return;
235         }
236
237         SendQueue* psendq = &sendq;
238         IOHook* hook = GetIOHook();
239         while (hook)
240         {
241                 int rv = hook->OnStreamSocketWrite(this, *psendq);
242                 psendq = NULL;
243
244                 // rv == 0 means the socket has blocked. Stop trying to send data.
245                 // IOHook has requested unblock notification from the socketengine.
246                 if (rv == 0)
247                         break;
248
249                 if (rv < 0)
250                 {
251                         SetError("Write Error"); // will not overwrite a better error message
252                         break;
253                 }
254
255                 IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
256                 hook = NULL;
257                 if (iohm)
258                 {
259                         psendq = &iohm->GetSendQ();
260                         hook = iohm->GetNextHook();
261                 }
262         }
263
264         if (psendq)
265                 FlushSendQ(*psendq);
266 }
267
268 void StreamSocket::FlushSendQ(SendQueue& sq)
269 {
270                 // don't even try if we are known to be blocking
271                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
272                         return;
273                 // start out optimistic - we won't need to write any more
274                 int eventChange = FD_WANT_EDGE_WRITE;
275                 while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE)
276                 {
277                         // Prepare a writev() call to write all buffers efficiently
278                         int bufcount = sq.size();
279
280                         // cap the number of buffers at MYIOV_MAX
281                         if (bufcount > MYIOV_MAX)
282                         {
283                                 bufcount = MYIOV_MAX;
284                         }
285
286                         int rv_max = 0;
287                         int rv;
288                         {
289                                 SocketEngine::IOVector iovecs[MYIOV_MAX];
290                                 size_t j = 0;
291                                 for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++)
292                                 {
293                                         const SendQueue::Element& elem = *i;
294                                         iovecs[j].iov_base = const_cast<char*>(elem.data());
295                                         iovecs[j].iov_len = elem.length();
296                                         rv_max += iovecs[j].iov_len;
297                                 }
298                                 rv = SocketEngine::WriteV(this, iovecs, bufcount);
299                         }
300
301                         if (rv == (int)sq.bytes())
302                         {
303                                 // it's our lucky day, everything got written out. Fast cleanup.
304                                 // This won't ever happen if the number of buffers got capped.
305                                 sq.clear();
306                         }
307                         else if (rv > 0)
308                         {
309                                 // Partial write. Clean out strings from the sendq
310                                 if (rv < rv_max)
311                                 {
312                                         // it's going to block now
313                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
314                                 }
315                                 while (rv > 0 && !sq.empty())
316                                 {
317                                         const SendQueue::Element& front = sq.front();
318                                         if (front.length() <= (size_t)rv)
319                                         {
320                                                 // this string got fully written out
321                                                 rv -= front.length();
322                                                 sq.pop_front();
323                                         }
324                                         else
325                                         {
326                                                 // stopped in the middle of this string
327                                                 sq.erase_front(rv);
328                                                 rv = 0;
329                                         }
330                                 }
331                         }
332                         else if (rv == 0)
333                         {
334                                 error = "Connection closed";
335                         }
336                         else if (SocketEngine::IgnoreError())
337                         {
338                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
339                         }
340                         else if (errno == EINTR)
341                         {
342                                 // restart interrupted syscall
343                                 errno = 0;
344                         }
345                         else
346                         {
347                                 error = SocketEngine::LastError();
348                         }
349                 }
350                 if (!error.empty())
351                 {
352                         // error - kill all events
353                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
354                 }
355                 else
356                 {
357                         SocketEngine::ChangeEventMask(this, eventChange);
358                 }
359 }
360
361 void StreamSocket::WriteData(const std::string &data)
362 {
363         if (fd < 0)
364         {
365                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Attempt to write data to dead socket: %s",
366                         data.c_str());
367                 return;
368         }
369
370         /* Append the data to the back of the queue ready for writing */
371         sendq.push_back(data);
372
373         SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
374 }
375
376 bool SocketTimeout::Tick(time_t)
377 {
378         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "SocketTimeout::Tick");
379
380         if (SocketEngine::GetRef(this->sfd) != this->sock)
381         {
382                 delete this;
383                 return false;
384         }
385
386         if (this->sock->state == I_CONNECTING)
387         {
388                 // for connecting sockets, the timeout can occur
389                 // which causes termination of the connection after
390                 // the given number of seconds without a successful
391                 // connection.
392                 this->sock->OnTimeout();
393                 this->sock->OnError(I_ERR_TIMEOUT);
394                 this->sock->state = I_ERROR;
395
396                 ServerInstance->GlobalCulls.AddItem(sock);
397         }
398
399         this->sock->Timeout = NULL;
400         delete this;
401         return false;
402 }
403
404 void BufferedSocket::OnConnected() { }
405 void BufferedSocket::OnTimeout() { return; }
406
407 void BufferedSocket::OnEventHandlerWrite()
408 {
409         if (state == I_CONNECTING)
410         {
411                 state = I_CONNECTED;
412                 this->OnConnected();
413                 if (!GetIOHook())
414                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
415         }
416         this->StreamSocket::OnEventHandlerWrite();
417 }
418
419 BufferedSocket::~BufferedSocket()
420 {
421         this->Close();
422         // The timer is removed from the TimerManager in Timer::~Timer()
423         delete Timeout;
424 }
425
426 void StreamSocket::OnEventHandlerError(int errornum)
427 {
428         if (!error.empty())
429                 return;
430
431         if (errornum == 0)
432                 SetError("Connection closed");
433         else
434                 SetError(SocketEngine::GetError(errornum));
435
436         BufferedSocketError errcode = I_ERR_OTHER;
437         switch (errornum)
438         {
439                 case ETIMEDOUT:
440                         errcode = I_ERR_TIMEOUT;
441                         break;
442                 case ECONNREFUSED:
443                 case 0:
444                         errcode = I_ERR_CONNECT;
445                         break;
446                 case EADDRINUSE:
447                         errcode = I_ERR_BIND;
448                         break;
449                 case EPIPE:
450                 case EIO:
451                         errcode = I_ERR_WRITE;
452                         break;
453         }
454
455         // Log and call OnError()
456         CheckError(errcode);
457 }
458
459 void StreamSocket::OnEventHandlerRead()
460 {
461         if (!error.empty())
462                 return;
463
464         try
465         {
466                 DoRead();
467         }
468         catch (CoreException& ex)
469         {
470                 ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str());
471                 SetError(ex.GetReason());
472         }
473         CheckError(I_ERR_OTHER);
474 }
475
476 void StreamSocket::OnEventHandlerWrite()
477 {
478         if (!error.empty())
479                 return;
480
481         DoWrite();
482         CheckError(I_ERR_OTHER);
483 }
484
485 void StreamSocket::CheckError(BufferedSocketError errcode)
486 {
487         if (!error.empty())
488         {
489                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
490                 OnError(errcode);
491         }
492 }
493
494 IOHook* StreamSocket::GetModHook(Module* mod) const
495 {
496         for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr))
497         {
498                 if (curr->prov->creator == mod)
499                         return curr;
500         }
501         return NULL;
502 }
503
504 void StreamSocket::AddIOHook(IOHook* newhook)
505 {
506         IOHook* curr = GetIOHook();
507         if (!curr)
508         {
509                 iohook = newhook;
510                 return;
511         }
512
513         IOHookMiddle* lasthook;
514         while (curr)
515         {
516                 lasthook = IOHookMiddle::ToMiddleHook(curr);
517                 if (!lasthook)
518                         return;
519                 curr = lasthook->GetNextHook();
520         }
521
522         lasthook->SetNextHook(newhook);
523 }
524
525 size_t StreamSocket::getSendQSize() const
526 {
527         size_t ret = sendq.bytes();
528         IOHook* curr = GetIOHook();
529         while (curr)
530         {
531                 const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr);
532                 if (!iohm)
533                         break;
534
535                 ret += iohm->GetSendQ().bytes();
536                 curr = iohm->GetNextHook();
537         }
538         return ret;
539 }