]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
ebbff448ed15bfd791aee08ccb49c3d13b1db6b9
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2007-2009 Robin Burchell <robin+git@viroteck.net>
6  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
7  *   Copyright (C) 2006-2007 Craig Edwards <craigedwards@brainbox.cc>
8  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include "iohook.h"
27
28 static IOHook* GetNextHook(IOHook* hook)
29 {
30         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
31         if (iohm)
32                 return iohm->GetNextHook();
33         return NULL;
34 }
35
36 BufferedSocket::BufferedSocket()
37 {
38         Timeout = NULL;
39         state = I_ERROR;
40 }
41
42 BufferedSocket::BufferedSocket(int newfd)
43 {
44         Timeout = NULL;
45         this->fd = newfd;
46         this->state = I_CONNECTED;
47         if (fd > -1)
48                 SocketEngine::AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
49 }
50
51 void BufferedSocket::DoConnect(const std::string& ipaddr, int aport, unsigned int maxtime, const std::string& connectbindip)
52 {
53         BufferedSocketError err = BeginConnect(ipaddr, aport, maxtime, connectbindip);
54         if (err != I_ERR_NONE)
55         {
56                 state = I_ERROR;
57                 SetError(SocketEngine::LastError());
58                 OnError(err);
59         }
60 }
61
62 BufferedSocketError BufferedSocket::BeginConnect(const std::string& ipaddr, int aport, unsigned int maxtime, const std::string& connectbindip)
63 {
64         irc::sockets::sockaddrs addr, bind;
65         if (!irc::sockets::aptosa(ipaddr, aport, addr))
66         {
67                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BUG: Hostname passed to BufferedSocket, rather than an IP address!");
68                 return I_ERR_CONNECT;
69         }
70
71         bind.sa.sa_family = 0;
72         if (!connectbindip.empty())
73         {
74                 if (!irc::sockets::aptosa(connectbindip, 0, bind))
75                 {
76                         return I_ERR_BIND;
77                 }
78         }
79
80         return BeginConnect(addr, bind, maxtime);
81 }
82
83 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int timeout)
84 {
85         if (fd < 0)
86                 fd = socket(dest.family(), SOCK_STREAM, 0);
87
88         if (fd < 0)
89                 return I_ERR_SOCKET;
90
91         if (bind.family() != 0)
92         {
93                 if (SocketEngine::Bind(fd, bind) < 0)
94                         return I_ERR_BIND;
95         }
96
97         SocketEngine::NonBlocking(fd);
98
99         if (SocketEngine::Connect(this, dest) == -1)
100         {
101                 if (errno != EINPROGRESS)
102                         return I_ERR_CONNECT;
103         }
104
105         this->state = I_CONNECTING;
106
107         if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
108                 return I_ERR_NOMOREFDS;
109
110         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout);
111         ServerInstance->Timers.AddTimer(this->Timeout);
112
113         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success");
114         return I_ERR_NONE;
115 }
116
117 void StreamSocket::Close()
118 {
119         if (this->fd > -1)
120         {
121                 // final chance, dump as much of the sendq as we can
122                 DoWrite();
123
124                 IOHook* hook = GetIOHook();
125                 DelIOHook();
126                 while (hook)
127                 {
128                         hook->OnStreamSocketClose(this);
129                         IOHook* const nexthook = GetNextHook(hook);
130                         delete hook;
131                         hook = nexthook;
132                 }
133                 SocketEngine::Shutdown(this, 2);
134                 SocketEngine::Close(this);
135         }
136 }
137
138 CullResult StreamSocket::cull()
139 {
140         Close();
141         return EventHandler::cull();
142 }
143
144 bool StreamSocket::GetNextLine(std::string& line, char delim)
145 {
146         std::string::size_type i = recvq.find(delim);
147         if (i == std::string::npos)
148                 return false;
149         line.assign(recvq, 0, i);
150         recvq.erase(0, i + 1);
151         return true;
152 }
153
154 int StreamSocket::HookChainRead(IOHook* hook, std::string& rq)
155 {
156         if (!hook)
157                 return ReadToRecvQ(rq);
158
159         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
160         if (iohm)
161         {
162                 // Call the next hook to put data into the recvq of the current hook
163                 const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ());
164                 if (ret <= 0)
165                         return ret;
166         }
167         return hook->OnStreamSocketRead(this, rq);
168 }
169
170 void StreamSocket::DoRead()
171 {
172         const std::string::size_type prevrecvqsize = recvq.size();
173
174         const int result = HookChainRead(GetIOHook(), recvq);
175         if (result < 0)
176         {
177                 SetError("Read Error"); // will not overwrite a better error message
178                 return;
179         }
180
181         if (recvq.size() > prevrecvqsize)
182                 OnDataReady();
183 }
184
185 int StreamSocket::ReadToRecvQ(std::string& rq)
186 {
187                 char* ReadBuffer = ServerInstance->GetReadBuffer();
188                 int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
189                 if (n == ServerInstance->Config->NetBufferSize)
190                 {
191                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
192                         rq.append(ReadBuffer, n);
193                 }
194                 else if (n > 0)
195                 {
196                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ);
197                         rq.append(ReadBuffer, n);
198                 }
199                 else if (n == 0)
200                 {
201                         error = "Connection closed";
202                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
203                         return -1;
204                 }
205                 else if (SocketEngine::IgnoreError())
206                 {
207                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
208                         return 0;
209                 }
210                 else if (errno == EINTR)
211                 {
212                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
213                         return 0;
214                 }
215                 else
216                 {
217                         error = SocketEngine::LastError();
218                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
219                         return -1;
220                 }
221         return n;
222 }
223
224 /* Don't try to prepare huge blobs of data to send to a blocked socket */
225 static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128;
226
227 void StreamSocket::DoWrite()
228 {
229         if (getSendQSize() == 0)
230                 return;
231         if (!error.empty() || fd < 0)
232         {
233                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket");
234                 return;
235         }
236
237         SendQueue* psendq = &sendq;
238         IOHook* hook = GetIOHook();
239         while (hook)
240         {
241                 int rv = hook->OnStreamSocketWrite(this, *psendq);
242                 psendq = NULL;
243
244                 // rv == 0 means the socket has blocked. Stop trying to send data.
245                 // IOHook has requested unblock notification from the socketengine.
246                 if (rv == 0)
247                         break;
248
249                 if (rv < 0)
250                 {
251                         SetError("Write Error"); // will not overwrite a better error message
252                         break;
253                 }
254
255                 IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
256                 hook = NULL;
257                 if (iohm)
258                 {
259                         psendq = &iohm->GetSendQ();
260                         hook = iohm->GetNextHook();
261                 }
262         }
263
264         if (psendq)
265                 FlushSendQ(*psendq);
266 }
267
268 void StreamSocket::FlushSendQ(SendQueue& sq)
269 {
270                 // don't even try if we are known to be blocking
271                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
272                         return;
273                 // start out optimistic - we won't need to write any more
274                 int eventChange = FD_WANT_EDGE_WRITE;
275                 while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE)
276                 {
277                         // Prepare a writev() call to write all buffers efficiently
278                         int bufcount = sq.size();
279
280                         // cap the number of buffers at MYIOV_MAX
281                         if (bufcount > MYIOV_MAX)
282                         {
283                                 bufcount = MYIOV_MAX;
284                         }
285
286                         int rv_max = 0;
287                         int rv;
288                         {
289                                 SocketEngine::IOVector iovecs[MYIOV_MAX];
290                                 size_t j = 0;
291                                 for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++)
292                                 {
293                                         const SendQueue::Element& elem = *i;
294                                         iovecs[j].iov_base = const_cast<char*>(elem.data());
295                                         iovecs[j].iov_len = elem.length();
296                                         rv_max += iovecs[j].iov_len;
297                                 }
298                                 rv = SocketEngine::WriteV(this, iovecs, bufcount);
299                         }
300
301                         if (rv == (int)sq.bytes())
302                         {
303                                 // it's our lucky day, everything got written out. Fast cleanup.
304                                 // This won't ever happen if the number of buffers got capped.
305                                 sq.clear();
306                         }
307                         else if (rv > 0)
308                         {
309                                 // Partial write. Clean out strings from the sendq
310                                 if (rv < rv_max)
311                                 {
312                                         // it's going to block now
313                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
314                                 }
315                                 while (rv > 0 && !sq.empty())
316                                 {
317                                         const SendQueue::Element& front = sq.front();
318                                         if (front.length() <= (size_t)rv)
319                                         {
320                                                 // this string got fully written out
321                                                 rv -= front.length();
322                                                 sq.pop_front();
323                                         }
324                                         else
325                                         {
326                                                 // stopped in the middle of this string
327                                                 sq.erase_front(rv);
328                                                 rv = 0;
329                                         }
330                                 }
331                         }
332                         else if (rv == 0)
333                         {
334                                 error = "Connection closed";
335                         }
336                         else if (SocketEngine::IgnoreError())
337                         {
338                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
339                         }
340                         else if (errno == EINTR)
341                         {
342                                 // restart interrupted syscall
343                                 errno = 0;
344                         }
345                         else
346                         {
347                                 error = SocketEngine::LastError();
348                         }
349                 }
350                 if (!error.empty())
351                 {
352                         // error - kill all events
353                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
354                 }
355                 else
356                 {
357                         SocketEngine::ChangeEventMask(this, eventChange);
358                 }
359 }
360
361 bool StreamSocket::OnSetEndPoint(const irc::sockets::sockaddrs& local, const irc::sockets::sockaddrs& remote)
362 {
363         return false;
364 }
365
366 void StreamSocket::WriteData(const std::string &data)
367 {
368         if (fd < 0)
369         {
370                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Attempt to write data to dead socket: %s",
371                         data.c_str());
372                 return;
373         }
374
375         /* Append the data to the back of the queue ready for writing */
376         sendq.push_back(data);
377
378         SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
379 }
380
381 bool SocketTimeout::Tick(time_t)
382 {
383         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "SocketTimeout::Tick");
384
385         if (SocketEngine::GetRef(this->sfd) != this->sock)
386         {
387                 delete this;
388                 return false;
389         }
390
391         if (this->sock->state == I_CONNECTING)
392         {
393                 // for connecting sockets, the timeout can occur
394                 // which causes termination of the connection after
395                 // the given number of seconds without a successful
396                 // connection.
397                 this->sock->OnTimeout();
398                 this->sock->OnError(I_ERR_TIMEOUT);
399                 this->sock->state = I_ERROR;
400
401                 ServerInstance->GlobalCulls.AddItem(sock);
402         }
403
404         this->sock->Timeout = NULL;
405         delete this;
406         return false;
407 }
408
409 void BufferedSocket::OnConnected() { }
410 void BufferedSocket::OnTimeout() { return; }
411
412 void BufferedSocket::OnEventHandlerWrite()
413 {
414         if (state == I_CONNECTING)
415         {
416                 state = I_CONNECTED;
417                 this->OnConnected();
418                 if (!GetIOHook())
419                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
420         }
421         this->StreamSocket::OnEventHandlerWrite();
422 }
423
424 BufferedSocket::~BufferedSocket()
425 {
426         this->Close();
427         // The timer is removed from the TimerManager in Timer::~Timer()
428         delete Timeout;
429 }
430
431 void StreamSocket::OnEventHandlerError(int errornum)
432 {
433         if (!error.empty())
434                 return;
435
436         if (errornum == 0)
437                 SetError("Connection closed");
438         else
439                 SetError(SocketEngine::GetError(errornum));
440
441         BufferedSocketError errcode = I_ERR_OTHER;
442         switch (errornum)
443         {
444                 case ETIMEDOUT:
445                         errcode = I_ERR_TIMEOUT;
446                         break;
447                 case ECONNREFUSED:
448                 case 0:
449                         errcode = I_ERR_CONNECT;
450                         break;
451                 case EADDRINUSE:
452                         errcode = I_ERR_BIND;
453                         break;
454                 case EPIPE:
455                 case EIO:
456                         errcode = I_ERR_WRITE;
457                         break;
458         }
459
460         // Log and call OnError()
461         CheckError(errcode);
462 }
463
464 void StreamSocket::OnEventHandlerRead()
465 {
466         if (!error.empty())
467                 return;
468
469         try
470         {
471                 DoRead();
472         }
473         catch (CoreException& ex)
474         {
475                 ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str());
476                 SetError(ex.GetReason());
477         }
478         CheckError(I_ERR_OTHER);
479 }
480
481 void StreamSocket::OnEventHandlerWrite()
482 {
483         if (!error.empty())
484                 return;
485
486         DoWrite();
487         CheckError(I_ERR_OTHER);
488 }
489
490 void StreamSocket::CheckError(BufferedSocketError errcode)
491 {
492         if (!error.empty())
493         {
494                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
495                 OnError(errcode);
496         }
497 }
498
499 IOHook* StreamSocket::GetModHook(Module* mod) const
500 {
501         for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr))
502         {
503                 if (curr->prov->creator == mod)
504                         return curr;
505         }
506         return NULL;
507 }
508
509 void StreamSocket::AddIOHook(IOHook* newhook)
510 {
511         IOHook* curr = GetIOHook();
512         if (!curr)
513         {
514                 iohook = newhook;
515                 return;
516         }
517
518         IOHookMiddle* lasthook;
519         while (curr)
520         {
521                 lasthook = IOHookMiddle::ToMiddleHook(curr);
522                 if (!lasthook)
523                         return;
524                 curr = lasthook->GetNextHook();
525         }
526
527         lasthook->SetNextHook(newhook);
528 }
529
530 size_t StreamSocket::getSendQSize() const
531 {
532         size_t ret = sendq.bytes();
533         IOHook* curr = GetIOHook();
534         while (curr)
535         {
536                 const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr);
537                 if (!iohm)
538                         break;
539
540                 ret += iohm->GetSendQ().bytes();
541                 curr = iohm->GetNextHook();
542         }
543         return ret;
544 }