]> git.netwichtig.de Git - user/henk/code/inspircd.git/blob - src/inspsocket.cpp
Merge branch 'insp20' into insp3.
[user/henk/code/inspircd.git] / src / inspsocket.cpp
1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2009 Daniel De Graaf <danieldg@inspircd.org>
5  *   Copyright (C) 2007-2009 Robin Burchell <robin+git@viroteck.net>
6  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
7  *   Copyright (C) 2006-2007 Craig Edwards <craigedwards@brainbox.cc>
8  *   Copyright (C) 2007 Dennis Friis <peavey@inspircd.org>
9  *   Copyright (C) 2006 Oliver Lupton <oliverlupton@gmail.com>
10  *
11  * This file is part of InspIRCd.  InspIRCd is free software: you can
12  * redistribute it and/or modify it under the terms of the GNU General Public
13  * License as published by the Free Software Foundation, version 2.
14  *
15  * This program is distributed in the hope that it will be useful, but WITHOUT
16  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22  */
23
24
25 #include "inspircd.h"
26 #include "iohook.h"
27
28 static IOHook* GetNextHook(IOHook* hook)
29 {
30         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
31         if (iohm)
32                 return iohm->GetNextHook();
33         return NULL;
34 }
35
36 BufferedSocket::BufferedSocket()
37 {
38         Timeout = NULL;
39         state = I_ERROR;
40 }
41
42 BufferedSocket::BufferedSocket(int newfd)
43 {
44         Timeout = NULL;
45         this->fd = newfd;
46         this->state = I_CONNECTED;
47         if (fd > -1)
48                 SocketEngine::AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
49 }
50
51 void BufferedSocket::DoConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int maxtime)
52 {
53         BufferedSocketError err = BeginConnect(dest, bind, maxtime);
54         if (err != I_ERR_NONE)
55         {
56                 state = I_ERROR;
57                 SetError(SocketEngine::LastError());
58                 OnError(err);
59         }
60 }
61
62 BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& dest, const irc::sockets::sockaddrs& bind, unsigned int timeout)
63 {
64         if (fd < 0)
65                 fd = socket(dest.family(), SOCK_STREAM, 0);
66
67         if (fd < 0)
68                 return I_ERR_SOCKET;
69
70         if (bind.family() != 0)
71         {
72                 if (SocketEngine::Bind(fd, bind) < 0)
73                         return I_ERR_BIND;
74         }
75
76         SocketEngine::NonBlocking(fd);
77
78         if (SocketEngine::Connect(this, dest) == -1)
79         {
80                 if (errno != EINPROGRESS)
81                         return I_ERR_CONNECT;
82         }
83
84         this->state = I_CONNECTING;
85
86         if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK))
87                 return I_ERR_NOMOREFDS;
88
89         this->Timeout = new SocketTimeout(this->GetFd(), this, timeout);
90         ServerInstance->Timers.AddTimer(this->Timeout);
91
92         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success");
93         return I_ERR_NONE;
94 }
95
96 void StreamSocket::Close()
97 {
98         if (this->fd > -1)
99         {
100                 // final chance, dump as much of the sendq as we can
101                 DoWrite();
102
103                 IOHook* hook = GetIOHook();
104                 DelIOHook();
105                 while (hook)
106                 {
107                         hook->OnStreamSocketClose(this);
108                         IOHook* const nexthook = GetNextHook(hook);
109                         delete hook;
110                         hook = nexthook;
111                 }
112                 SocketEngine::Shutdown(this, 2);
113                 SocketEngine::Close(this);
114         }
115 }
116
117 CullResult StreamSocket::cull()
118 {
119         Close();
120         return EventHandler::cull();
121 }
122
123 bool StreamSocket::GetNextLine(std::string& line, char delim)
124 {
125         std::string::size_type i = recvq.find(delim);
126         if (i == std::string::npos)
127                 return false;
128         line.assign(recvq, 0, i);
129         recvq.erase(0, i + 1);
130         return true;
131 }
132
133 int StreamSocket::HookChainRead(IOHook* hook, std::string& rq)
134 {
135         if (!hook)
136                 return ReadToRecvQ(rq);
137
138         IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
139         if (iohm)
140         {
141                 // Call the next hook to put data into the recvq of the current hook
142                 const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ());
143                 if (ret <= 0)
144                         return ret;
145         }
146         return hook->OnStreamSocketRead(this, rq);
147 }
148
149 void StreamSocket::DoRead()
150 {
151         const std::string::size_type prevrecvqsize = recvq.size();
152
153         const int result = HookChainRead(GetIOHook(), recvq);
154         if (result < 0)
155         {
156                 SetError("Read Error"); // will not overwrite a better error message
157                 return;
158         }
159
160         if (recvq.size() > prevrecvqsize)
161                 OnDataReady();
162 }
163
164 int StreamSocket::ReadToRecvQ(std::string& rq)
165 {
166                 char* ReadBuffer = ServerInstance->GetReadBuffer();
167                 int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
168                 if (n == ServerInstance->Config->NetBufferSize)
169                 {
170                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
171                         rq.append(ReadBuffer, n);
172                 }
173                 else if (n > 0)
174                 {
175                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ);
176                         rq.append(ReadBuffer, n);
177                 }
178                 else if (n == 0)
179                 {
180                         error = "Connection closed";
181                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
182                         return -1;
183                 }
184                 else if (SocketEngine::IgnoreError())
185                 {
186                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
187                         return 0;
188                 }
189                 else if (errno == EINTR)
190                 {
191                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
192                         return 0;
193                 }
194                 else
195                 {
196                         error = SocketEngine::LastError();
197                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
198                         return -1;
199                 }
200         return n;
201 }
202
203 /* Don't try to prepare huge blobs of data to send to a blocked socket */
204 static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128;
205
206 void StreamSocket::DoWrite()
207 {
208         if (getSendQSize() == 0)
209                 return;
210         if (!error.empty() || fd < 0)
211         {
212                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket");
213                 return;
214         }
215
216         SendQueue* psendq = &sendq;
217         IOHook* hook = GetIOHook();
218         while (hook)
219         {
220                 int rv = hook->OnStreamSocketWrite(this, *psendq);
221                 psendq = NULL;
222
223                 // rv == 0 means the socket has blocked. Stop trying to send data.
224                 // IOHook has requested unblock notification from the socketengine.
225                 if (rv == 0)
226                         break;
227
228                 if (rv < 0)
229                 {
230                         SetError("Write Error"); // will not overwrite a better error message
231                         break;
232                 }
233
234                 IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook);
235                 hook = NULL;
236                 if (iohm)
237                 {
238                         psendq = &iohm->GetSendQ();
239                         hook = iohm->GetNextHook();
240                 }
241         }
242
243         if (psendq)
244                 FlushSendQ(*psendq);
245 }
246
247 void StreamSocket::FlushSendQ(SendQueue& sq)
248 {
249                 // don't even try if we are known to be blocking
250                 if (GetEventMask() & FD_WRITE_WILL_BLOCK)
251                         return;
252                 // start out optimistic - we won't need to write any more
253                 int eventChange = FD_WANT_EDGE_WRITE;
254                 while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE)
255                 {
256                         // Prepare a writev() call to write all buffers efficiently
257                         int bufcount = sq.size();
258
259                         // cap the number of buffers at MYIOV_MAX
260                         if (bufcount > MYIOV_MAX)
261                         {
262                                 bufcount = MYIOV_MAX;
263                         }
264
265                         int rv_max = 0;
266                         int rv;
267                         {
268                                 SocketEngine::IOVector iovecs[MYIOV_MAX];
269                                 size_t j = 0;
270                                 for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++)
271                                 {
272                                         const SendQueue::Element& elem = *i;
273                                         iovecs[j].iov_base = const_cast<char*>(elem.data());
274                                         iovecs[j].iov_len = elem.length();
275                                         rv_max += iovecs[j].iov_len;
276                                 }
277                                 rv = SocketEngine::WriteV(this, iovecs, bufcount);
278                         }
279
280                         if (rv == (int)sq.bytes())
281                         {
282                                 // it's our lucky day, everything got written out. Fast cleanup.
283                                 // This won't ever happen if the number of buffers got capped.
284                                 sq.clear();
285                         }
286                         else if (rv > 0)
287                         {
288                                 // Partial write. Clean out strings from the sendq
289                                 if (rv < rv_max)
290                                 {
291                                         // it's going to block now
292                                         eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
293                                 }
294                                 while (rv > 0 && !sq.empty())
295                                 {
296                                         const SendQueue::Element& front = sq.front();
297                                         if (front.length() <= (size_t)rv)
298                                         {
299                                                 // this string got fully written out
300                                                 rv -= front.length();
301                                                 sq.pop_front();
302                                         }
303                                         else
304                                         {
305                                                 // stopped in the middle of this string
306                                                 sq.erase_front(rv);
307                                                 rv = 0;
308                                         }
309                                 }
310                         }
311                         else if (rv == 0)
312                         {
313                                 error = "Connection closed";
314                         }
315                         else if (SocketEngine::IgnoreError())
316                         {
317                                 eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK;
318                         }
319                         else if (errno == EINTR)
320                         {
321                                 // restart interrupted syscall
322                                 errno = 0;
323                         }
324                         else
325                         {
326                                 error = SocketEngine::LastError();
327                         }
328                 }
329                 if (!error.empty())
330                 {
331                         // error - kill all events
332                         SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
333                 }
334                 else
335                 {
336                         SocketEngine::ChangeEventMask(this, eventChange);
337                 }
338 }
339
340 bool StreamSocket::OnSetEndPoint(const irc::sockets::sockaddrs& local, const irc::sockets::sockaddrs& remote)
341 {
342         return false;
343 }
344
345 void StreamSocket::WriteData(const std::string &data)
346 {
347         if (fd < 0)
348         {
349                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Attempt to write data to dead socket: %s",
350                         data.c_str());
351                 return;
352         }
353
354         /* Append the data to the back of the queue ready for writing */
355         sendq.push_back(data);
356
357         SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
358 }
359
360 bool SocketTimeout::Tick(time_t)
361 {
362         ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "SocketTimeout::Tick");
363
364         if (SocketEngine::GetRef(this->sfd) != this->sock)
365         {
366                 delete this;
367                 return false;
368         }
369
370         if (this->sock->state == I_CONNECTING)
371         {
372                 // for connecting sockets, the timeout can occur
373                 // which causes termination of the connection after
374                 // the given number of seconds without a successful
375                 // connection.
376                 this->sock->OnTimeout();
377                 this->sock->OnError(I_ERR_TIMEOUT);
378                 this->sock->state = I_ERROR;
379
380                 ServerInstance->GlobalCulls.AddItem(sock);
381         }
382
383         this->sock->Timeout = NULL;
384         delete this;
385         return false;
386 }
387
388 void BufferedSocket::OnConnected() { }
389 void BufferedSocket::OnTimeout() { return; }
390
391 void BufferedSocket::OnEventHandlerWrite()
392 {
393         if (state == I_CONNECTING)
394         {
395                 state = I_CONNECTED;
396                 this->OnConnected();
397                 if (!GetIOHook())
398                         SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
399         }
400         this->StreamSocket::OnEventHandlerWrite();
401 }
402
403 BufferedSocket::~BufferedSocket()
404 {
405         this->Close();
406         // The timer is removed from the TimerManager in Timer::~Timer()
407         delete Timeout;
408 }
409
410 void StreamSocket::OnEventHandlerError(int errornum)
411 {
412         if (!error.empty())
413                 return;
414
415         if (errornum == 0)
416                 SetError("Connection closed");
417         else
418                 SetError(SocketEngine::GetError(errornum));
419
420         BufferedSocketError errcode = I_ERR_OTHER;
421         switch (errornum)
422         {
423                 case ETIMEDOUT:
424                         errcode = I_ERR_TIMEOUT;
425                         break;
426                 case ECONNREFUSED:
427                 case 0:
428                         errcode = I_ERR_CONNECT;
429                         break;
430                 case EADDRINUSE:
431                         errcode = I_ERR_BIND;
432                         break;
433                 case EPIPE:
434                 case EIO:
435                         errcode = I_ERR_WRITE;
436                         break;
437         }
438
439         // Log and call OnError()
440         CheckError(errcode);
441 }
442
443 void StreamSocket::OnEventHandlerRead()
444 {
445         if (!error.empty())
446                 return;
447
448         try
449         {
450                 DoRead();
451         }
452         catch (CoreException& ex)
453         {
454                 ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str());
455                 SetError(ex.GetReason());
456         }
457         CheckError(I_ERR_OTHER);
458 }
459
460 void StreamSocket::OnEventHandlerWrite()
461 {
462         if (!error.empty())
463                 return;
464
465         DoWrite();
466         CheckError(I_ERR_OTHER);
467 }
468
469 void StreamSocket::CheckError(BufferedSocketError errcode)
470 {
471         if (!error.empty())
472         {
473                 ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str());
474                 OnError(errcode);
475         }
476 }
477
478 IOHook* StreamSocket::GetModHook(Module* mod) const
479 {
480         for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr))
481         {
482                 if (curr->prov->creator == mod)
483                         return curr;
484         }
485         return NULL;
486 }
487
488 void StreamSocket::AddIOHook(IOHook* newhook)
489 {
490         IOHook* curr = GetIOHook();
491         if (!curr)
492         {
493                 iohook = newhook;
494                 return;
495         }
496
497         IOHookMiddle* lasthook;
498         while (curr)
499         {
500                 lasthook = IOHookMiddle::ToMiddleHook(curr);
501                 if (!lasthook)
502                         return;
503                 curr = lasthook->GetNextHook();
504         }
505
506         lasthook->SetNextHook(newhook);
507 }
508
509 size_t StreamSocket::getSendQSize() const
510 {
511         size_t ret = sendq.bytes();
512         IOHook* curr = GetIOHook();
513         while (curr)
514         {
515                 const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr);
516                 if (!iohm)
517                         break;
518
519                 ret += iohm->GetSendQ().bytes();
520                 curr = iohm->GetNextHook();
521         }
522         return ret;
523 }