]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
Ignore clients on ulined servers when reporting stats in LUSERS.
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2007-2009 Robin Burchell <robin+git@viroteck.net>
6  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
7  *   Copyright (C) 2006-2007 Craig Edwards <craigedwards@brainbox.cc>
8  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include "iohook.h"
27
28 static IOHook* GetNextHook(IOHook* hook)
29 {
30         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
31         if (iohm)
32                 return iohm->GetNextHook();
33         return NULL;
34 }
35
36 BufferedSocket::BufferedSocket()
37 {
38         Timeout = NULL;
39         state = I_ERROR;
40 }
41
42 BufferedSocket::BufferedSocket(int newfd)
43 {
44         Timeout = NULL;
45         this->fd = newfd;
46         this->state = I_CONNECTED;
47         if (fd > -1)
48                 SocketEngine::AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
49 }
50
51 void BufferedSocket::DoConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int maxtime)
52 {
53         BufferedSocketError err = BeginConnect(dest, bind, maxtime);
54         if (err != I_ERR_NONE)
55         {
56                 state = I_ERROR;
57                 SetError(SocketEngine::LastError());
58                 OnError(err);
59         }
60 }
61
62 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int timeout)
63 {
64         if (fd < 0)
65                 fd = socket(dest.family(), SOCK_STREAM, 0);
66
67         if (fd < 0)
68                 return I_ERR_SOCKET;
69
70         if (bind.family() != 0)
71         {
72                 if (SocketEngine::Bind(fd, bind) < 0)
73                         return I_ERR_BIND;
74         }
75
76         SocketEngine::NonBlocking(fd);
77
78         if (SocketEngine::Connect(this, dest) == -1)
79         {
80                 if (errno != EINPROGRESS)
81                         return I_ERR_CONNECT;
82         }
83
84         this->state = I_CONNECTING;
85
86         if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
87                 return I_ERR_NOMOREFDS;
88
89         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout);
90         ServerInstance->Timers.AddTimer(this->Timeout);
91
92         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success");
93         return I_ERR_NONE;
94 }
95
96 void StreamSocket::Close()
97 {
98         if (closing)
99                 return;
100
101         closing = true;
102         if (this->fd > -1)
103         {
104                 // final chance, dump as much of the sendq as we can
105                 DoWrite();
106
107                 IOHook* hook = GetIOHook();
108                 DelIOHook();
109                 while (hook)
110                 {
111                         hook->OnStreamSocketClose(this);
112                         IOHook* const nexthook = GetNextHook(hook);
113                         delete hook;
114                         hook = nexthook;
115                 }
116                 SocketEngine::Shutdown(this, 2);
117                 SocketEngine::Close(this);
118         }
119 }
120
121 void StreamSocket::Close(bool writeblock)
122 {
123         if (getSendQSize() != 0 && writeblock)
124                 closeonempty = true;
125         else
126                 Close();
127 }
128
129 CullResult StreamSocket::cull()
130 {
131         Close();
132         return EventHandler::cull();
133 }
134
135 bool StreamSocket::GetNextLine(std::string& line, char delim)
136 {
137         std::string::size_type i = recvq.find(delim);
138         if (i == std::string::npos)
139                 return false;
140         line.assign(recvq, 0, i);
141         recvq.erase(0, i + 1);
142         return true;
143 }
144
145 int StreamSocket::HookChainRead(IOHook* hook, std::string& rq)
146 {
147         if (!hook)
148                 return ReadToRecvQ(rq);
149
150         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
151         if (iohm)
152         {
153                 // Call the next hook to put data into the recvq of the current hook
154                 const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ());
155                 if (ret <= 0)
156                         return ret;
157         }
158         return hook->OnStreamSocketRead(this, rq);
159 }
160
161 void StreamSocket::DoRead()
162 {
163         const std::string::size_type prevrecvqsize = recvq.size();
164
165         const int result = HookChainRead(GetIOHook(), recvq);
166         if (result < 0)
167         {
168                 SetError("Read Error"); // will not overwrite a better error message
169                 return;
170         }
171
172         if (recvq.size() > prevrecvqsize)
173                 OnDataReady();
174 }
175
176 int StreamSocket::ReadToRecvQ(std::string& rq)
177 {
178                 char* ReadBuffer = ServerInstance->GetReadBuffer();
179                 int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
180                 if (n == ServerInstance->Config->NetBufferSize)
181                 {
182                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
183                         rq.append(ReadBuffer, n);
184                 }
185                 else if (n > 0)
186                 {
187                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ);
188                         rq.append(ReadBuffer, n);
189                 }
190                 else if (n == 0)
191                 {
192                         error = "Connection closed";
193                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
194                         return -1;
195                 }
196                 else if (SocketEngine::IgnoreError())
197                 {
198                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
199                         return 0;
200                 }
201                 else if (errno == EINTR)
202                 {
203                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
204                         return 0;
205                 }
206                 else
207                 {
208                         error = SocketEngine::LastError();
209                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
210                         return -1;
211                 }
212         return n;
213 }
214
215 /* Don't try to prepare huge blobs of data to send to a blocked socket */
216 static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128;
217
218 void StreamSocket::DoWrite()
219 {
220         if (getSendQSize() == 0)
221         {
222                 if (closeonempty)
223                         Close();
224
225                 return;
226         }
227         if (!error.empty() || fd < 0)
228         {
229                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket");
230                 return;
231         }
232
233         SendQueue* psendq = &sendq;
234         IOHook* hook = GetIOHook();
235         while (hook)
236         {
237                 int rv = hook->OnStreamSocketWrite(this, *psendq);
238                 psendq = NULL;
239
240                 // rv == 0 means the socket has blocked. Stop trying to send data.
241                 // IOHook has requested unblock notification from the socketengine.
242                 if (rv == 0)
243                         break;
244
245                 if (rv < 0)
246                 {
247                         SetError("Write Error"); // will not overwrite a better error message
248                         break;
249                 }
250
251                 IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
252                 hook = NULL;
253                 if (iohm)
254                 {
255                         psendq = &iohm->GetSendQ();
256                         hook = iohm->GetNextHook();
257                 }
258         }
259
260         if (psendq)
261                 FlushSendQ(*psendq);
262
263         if (getSendQSize() == 0 && closeonempty)
264                 Close();
265 }
266
267 void StreamSocket::FlushSendQ(SendQueue& sq)
268 {
269                 // don't even try if we are known to be blocking
270                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
271                         return;
272                 // start out optimistic - we won't need to write any more
273                 int eventChange = FD_WANT_EDGE_WRITE;
274                 while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE)
275                 {
276                         // Prepare a writev() call to write all buffers efficiently
277                         int bufcount = sq.size();
278
279                         // cap the number of buffers at MYIOV_MAX
280                         if (bufcount > MYIOV_MAX)
281                         {
282                                 bufcount = MYIOV_MAX;
283                         }
284
285                         int rv_max = 0;
286                         int rv;
287                         {
288                                 SocketEngine::IOVector iovecs[MYIOV_MAX];
289                                 size_t j = 0;
290                                 for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++)
291                                 {
292                                         const SendQueue::Element& elem = *i;
293                                         iovecs[j].iov_base = const_cast<char*>(elem.data());
294                                         iovecs[j].iov_len = elem.length();
295                                         rv_max += iovecs[j].iov_len;
296                                 }
297                                 rv = SocketEngine::WriteV(this, iovecs, bufcount);
298                         }
299
300                         if (rv == (int)sq.bytes())
301                         {
302                                 // it's our lucky day, everything got written out. Fast cleanup.
303                                 // This won't ever happen if the number of buffers got capped.
304                                 sq.clear();
305                         }
306                         else if (rv > 0)
307                         {
308                                 // Partial write. Clean out strings from the sendq
309                                 if (rv < rv_max)
310                                 {
311                                         // it's going to block now
312                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
313                                 }
314                                 while (rv > 0 && !sq.empty())
315                                 {
316                                         const SendQueue::Element& front = sq.front();
317                                         if (front.length() <= (size_t)rv)
318                                         {
319                                                 // this string got fully written out
320                                                 rv -= front.length();
321                                                 sq.pop_front();
322                                         }
323                                         else
324                                         {
325                                                 // stopped in the middle of this string
326                                                 sq.erase_front(rv);
327                                                 rv = 0;
328                                         }
329                                 }
330                         }
331                         else if (rv == 0)
332                         {
333                                 error = "Connection closed";
334                         }
335                         else if (SocketEngine::IgnoreError())
336                         {
337                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
338                         }
339                         else if (errno == EINTR)
340                         {
341                                 // restart interrupted syscall
342                                 errno = 0;
343                         }
344                         else
345                         {
346                                 error = SocketEngine::LastError();
347                         }
348                 }
349                 if (!error.empty())
350                 {
351                         // error - kill all events
352                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
353                 }
354                 else
355                 {
356                         SocketEngine::ChangeEventMask(this, eventChange);
357                 }
358 }
359
360 bool StreamSocket::OnSetEndPoint(const irc::sockets::sockaddrs& local, const irc::sockets::sockaddrs& remote)
361 {
362         return false;
363 }
364
365 void StreamSocket::WriteData(const std::string &data)
366 {
367         if (fd < 0)
368         {
369                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Attempt to write data to dead socket: %s",
370                         data.c_str());
371                 return;
372         }
373
374         /* Append the data to the back of the queue ready for writing */
375         sendq.push_back(data);
376
377         SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
378 }
379
380 bool SocketTimeout::Tick(time_t)
381 {
382         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "SocketTimeout::Tick");
383
384         if (SocketEngine::GetRef(this->sfd) != this->sock)
385         {
386                 delete this;
387                 return false;
388         }
389
390         if (this->sock->state == I_CONNECTING)
391         {
392                 // for connecting sockets, the timeout can occur
393                 // which causes termination of the connection after
394                 // the given number of seconds without a successful
395                 // connection.
396                 this->sock->OnTimeout();
397                 this->sock->OnError(I_ERR_TIMEOUT);
398                 this->sock->state = I_ERROR;
399
400                 ServerInstance->GlobalCulls.AddItem(sock);
401         }
402
403         this->sock->Timeout = NULL;
404         delete this;
405         return false;
406 }
407
408 void BufferedSocket::OnConnected() { }
409 void BufferedSocket::OnTimeout() { return; }
410
411 void BufferedSocket::OnEventHandlerWrite()
412 {
413         if (state == I_CONNECTING)
414         {
415                 state = I_CONNECTED;
416                 this->OnConnected();
417                 if (!GetIOHook())
418                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
419         }
420         this->StreamSocket::OnEventHandlerWrite();
421 }
422
423 BufferedSocket::~BufferedSocket()
424 {
425         this->Close();
426         // The timer is removed from the TimerManager in Timer::~Timer()
427         delete Timeout;
428 }
429
430 void StreamSocket::OnEventHandlerError(int errornum)
431 {
432         if (!error.empty())
433                 return;
434
435         if (errornum == 0)
436                 SetError("Connection closed");
437         else
438                 SetError(SocketEngine::GetError(errornum));
439
440         BufferedSocketError errcode = I_ERR_OTHER;
441         switch (errornum)
442         {
443                 case ETIMEDOUT:
444                         errcode = I_ERR_TIMEOUT;
445                         break;
446                 case ECONNREFUSED:
447                 case 0:
448                         errcode = I_ERR_CONNECT;
449                         break;
450                 case EADDRINUSE:
451                         errcode = I_ERR_BIND;
452                         break;
453                 case EPIPE:
454                 case EIO:
455                         errcode = I_ERR_WRITE;
456                         break;
457         }
458
459         // Log and call OnError()
460         CheckError(errcode);
461 }
462
463 void StreamSocket::OnEventHandlerRead()
464 {
465         if (!error.empty())
466                 return;
467
468         try
469         {
470                 DoRead();
471         }
472         catch (CoreException& ex)
473         {
474                 ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str());
475                 SetError(ex.GetReason());
476         }
477         CheckError(I_ERR_OTHER);
478 }
479
480 void StreamSocket::OnEventHandlerWrite()
481 {
482         if (!error.empty())
483                 return;
484
485         DoWrite();
486         CheckError(I_ERR_OTHER);
487 }
488
489 void StreamSocket::CheckError(BufferedSocketError errcode)
490 {
491         if (!error.empty())
492         {
493                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
494                 OnError(errcode);
495         }
496 }
497
498 IOHook* StreamSocket::GetModHook(Module* mod) const
499 {
500         for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr))
501         {
502                 if (curr->prov->creator == mod)
503                         return curr;
504         }
505         return NULL;
506 }
507
508 void StreamSocket::AddIOHook(IOHook* newhook)
509 {
510         IOHook* curr = GetIOHook();
511         if (!curr)
512         {
513                 iohook = newhook;
514                 return;
515         }
516
517         IOHookMiddle* lasthook;
518         while (curr)
519         {
520                 lasthook = IOHookMiddle::ToMiddleHook(curr);
521                 if (!lasthook)
522                         return;
523                 curr = lasthook->GetNextHook();
524         }
525
526         lasthook->SetNextHook(newhook);
527 }
528
529 size_t StreamSocket::getSendQSize() const
530 {
531         size_t ret = sendq.bytes();
532         IOHook* curr = GetIOHook();
533         while (curr)
534         {
535                 const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr);
536                 if (!iohm)
537                         break;
538
539                 ret += iohm->GetSendQ().bytes();
540                 curr = iohm->GetNextHook();
541         }
542         return ret;
543 }
544
545 void StreamSocket::SwapInternals(StreamSocket& other)
546 {
547         if (type != other.type)
548                 return;
549
550         EventHandler::SwapInternals(other);
551         std::swap(closeonempty, other.closeonempty);
552         std::swap(closing, other.closing);
553         std::swap(error, other.error);
554         std::swap(iohook, other.iohook);
555         std::swap(recvq, other.recvq);
556         std::swap(sendq, other.sendq);
557 }