]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
* (timer) stop the bot timer for shutdown / rescan
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 class ::String
2   # Calculate the penalty which will be assigned to this message
3   # by the IRCd
4   def irc_send_penalty
5     # According to eggrdop, the initial penalty is
6     penalty = 1 + self.size/100
7     # on everything but UnderNET where it's
8     # penalty = 2 + self.size/120
9
10     cmd, pars = self.split($;,2)
11     debug "cmd: #{cmd}, pars: #{pars.inspect}"
12     case cmd.to_sym
13     when :KICK
14       chan, nick, msg = pars.split
15       chan = chan.split(',')
16       nick = nick.split(',')
17       penalty += nick.size
18       penalty *= chan.size
19     when :MODE
20       chan, modes, argument = pars.split
21       extra = 0
22       if modes
23         extra = 1
24         if argument
25           extra += modes.split(/\+|-/).size
26         else
27           extra += 3 * modes.split(/\+|-/).size
28         end
29       end
30       if argument
31         extra += 2 * argument.split.size
32       end
33       penalty += extra * chan.split.size
34     when :TOPIC
35       penalty += 1
36       penalty += 2 unless pars.split.size < 2
37     when :PRIVMSG, :NOTICE
38       dests = pars.split($;,2).first
39       penalty += dests.split(',').size
40     when :WHO
41       # I'm too lazy to implement this one correctly
42       penalty += 5
43     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
44       penalty += 2
45     when :INVITE, :NICK
46       penalty += 3
47     when :ISON
48       penalty += 1
49     else # Unknown messages
50       penalty += 1
51     end
52     if penalty > 99
53       debug "Wow, more than 99 secs of penalty!"
54       penalty = 99
55     end
56     if penalty < 2
57       debug "Wow, less than 2 secs of penalty!"
58       penalty = 2
59     end
60     debug "penalty: #{penalty}"
61     return penalty
62   end
63 end
64
65 module Irc
66
67   require 'socket'
68   require 'thread'
69   require 'rbot/timer'
70
71   class QueueRing
72     # A QueueRing is implemented as an array with elements in the form
73     # [chan, [message1, message2, ...]
74     # Note that the channel +chan+ has no actual bearing with the channels
75     # to which messages will be sent
76
77     def initialize
78       @storage = Array.new
79       @last_idx = -1
80     end
81
82     def clear
83       @storage.clear
84       @last_idx = -1
85     end
86
87     def length
88       len = 0
89       @storage.each {|c|
90         len += c[1].size
91       }
92       return len
93     end
94     alias :size :length
95
96     def empty?
97       @storage.empty?
98     end
99
100     def push(mess, chan)
101       cmess = @storage.assoc(chan)
102       if cmess
103         idx = @storage.index(cmess)
104         cmess[1] << mess
105         @storage[idx] = cmess
106       else
107         @storage << [chan, [mess]]
108       end
109     end
110
111     def next
112       if empty?
113         warning "trying to access empty ring"
114         return nil
115       end
116       save_idx = @last_idx
117       @last_idx = (@last_idx + 1) % @storage.size
118       mess = @storage[@last_idx][1].first
119       @last_idx = save_idx
120       return mess
121     end
122
123     def shift
124       if empty?
125         warning "trying to access empty ring"
126         return nil
127       end
128       @last_idx = (@last_idx + 1) % @storage.size
129       mess = @storage[@last_idx][1].shift
130       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
131       return mess
132     end
133
134   end
135
136   class MessageQueue
137     def initialize
138       # a MessageQueue is an array of QueueRings
139       # rings have decreasing priority, so messages in ring 0
140       # are more important than messages in ring 1, and so on
141       @rings = Array.new(3) { |i|
142         if i > 0
143           QueueRing.new
144         else
145           # ring 0 is special in that if it's not empty, it will
146           # be popped. IOW, ring 0 can starve the other rings
147           # ring 0 is strictly FIFO and is therefore implemented
148           # as an array
149           Array.new
150         end
151       }
152       # the other rings are satisfied round-robin
153       @last_ring = 0
154     end
155
156     def clear
157       @rings.each { |r|
158         r.clear
159       }
160       @last_ring = 0
161     end
162
163     def push(mess, chan=nil, cring=0)
164       ring = cring
165       if ring == 0
166         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
167         @rings[0] << mess
168       else
169         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
170         @rings[ring].push mess, chan
171       end
172     end
173
174     def empty?
175       @rings.each { |r|
176         return false unless r.empty?
177       }
178       return true
179     end
180
181     def length
182       len = 0
183       @rings.each { |r|
184         len += r.size
185       }
186       len
187     end
188     alias :size :length
189
190     def next
191       if empty?
192         warning "trying to access empty ring"
193         return nil
194       end
195       mess = nil
196       if !@rings[0].empty?
197         mess = @rings[0].first
198       else
199         save_ring = @last_ring
200         (@rings.size - 1).times {
201           @last_ring = (@last_ring % (@rings.size - 1)) + 1
202           if !@rings[@last_ring].empty?
203             mess = @rings[@last_ring].next
204             break
205           end
206         }
207         @last_ring = save_ring
208       end
209       error "nil message" if mess.nil?
210       return mess
211     end
212
213     def shift
214       if empty?
215         warning "trying to access empty ring"
216         return nil
217       end
218       mess = nil
219       if !@rings[0].empty?
220         return @rings[0].shift
221       end
222       (@rings.size - 1).times {
223         @last_ring = (@last_ring % (@rings.size - 1)) + 1
224         if !@rings[@last_ring].empty?
225           return @rings[@last_ring].shift
226         end
227       }
228       error "nil message" if mess.nil?
229       return mess
230     end
231
232   end
233
234   # wrapped TCPSocket for communication with the server.
235   # emulates a subset of TCPSocket functionality
236   class IrcSocket
237
238     MAX_IRC_SEND_PENALTY = 10
239
240     # total number of lines sent to the irc server
241     attr_reader :lines_sent
242
243     # total number of lines received from the irc server
244     attr_reader :lines_received
245
246     # total number of bytes sent to the irc server
247     attr_reader :bytes_sent
248
249     # total number of bytes received from the irc server
250     attr_reader :bytes_received
251
252     # accumulator for the throttle
253     attr_reader :throttle_bytes
254
255     # delay between lines sent
256     attr_reader :sendq_delay
257
258     # max lines to burst
259     attr_reader :sendq_burst
260
261     # an optional filter object. we call @filter.in(data) for
262     # all incoming data and @filter.out(data) for all outgoing data
263     attr_reader :filter
264
265     # normalized uri of the current server
266     attr_reader :server_uri
267
268     # default trivial filter class
269     class IdentityFilter
270         def in(x)
271             x
272         end
273
274         def out(x)
275             x
276         end
277     end
278
279     # set filter to identity, not to nil
280     def filter=(f)
281         @filter = f || IdentityFilter.new
282     end
283
284     # server_list:: list of servers to connect to
285     # host::   optional local host to bind to (ruby 1.7+ required)
286     # create a new IrcSocket
287     def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
288       @timer = Timer.new
289       @act_id = @timer.add(0.2, :blocked => true) { spool }
290       @server_list = server_list.dup
291       @server_uri = nil
292       @conn_count = 0
293       @host = host
294       @sock = nil
295       @filter = IdentityFilter.new
296       @spooler = false
297       @lines_sent = 0
298       @lines_received = 0
299       if opts.kind_of?(Hash) and opts.key?(:ssl)
300         @ssl = opts[:ssl]
301       else
302         @ssl = false
303       end
304
305       if sendq_delay
306         @sendq_delay = sendq_delay.to_f
307       else
308         @sendq_delay = 2
309       end
310       @last_send = Time.new - @sendq_delay
311       @flood_send = Time.new
312       @last_throttle = Time.new
313       @burst = 0
314       if sendq_burst
315         @sendq_burst = sendq_burst.to_i
316       else
317         @sendq_burst = 4
318       end
319     end
320
321     def connected?
322       !@sock.nil?
323     end
324
325     # open a TCP connection to the server
326     def connect
327       if connected?
328         warning "reconnecting while connected"
329         return
330       end
331       srv_uri = @server_list[@conn_count % @server_list.size].dup
332       srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
333       @conn_count += 1
334       @server_uri = URI.parse(srv_uri)
335       @server_uri.port = 6667 if !@server_uri.port
336       debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
337
338       if(@host)
339         begin
340           @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
341         rescue ArgumentError => e
342           error "Your version of ruby does not support binding to a "
343           error "specific local address, please upgrade if you wish "
344           error "to use HOST = foo"
345           error "(this option has been disabled in order to continue)"
346           @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
347         end
348       else
349         @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
350       end
351       if(@ssl)
352         require 'openssl'
353         ssl_context = OpenSSL::SSL::SSLContext.new()
354         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
355         @rawsock = @sock
356         @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
357         @sock.sync_close = true
358         @sock.connect
359       end
360       @qthread = false
361       @qmutex = Mutex.new
362       @sendq = MessageQueue.new
363     end
364
365     def sendq_delay=(newfreq)
366       debug "changing sendq frequency to #{newfreq}"
367       @qmutex.synchronize do
368         @sendq_delay = newfreq
369         if newfreq == 0
370           clearq
371           @timer.block(@act_id)
372         else
373           @timer.unblock(@act_id)
374         end
375       end
376     end
377
378     def sendq_burst=(newburst)
379       @qmutex.synchronize do
380         @sendq_burst = newburst
381       end
382     end
383
384     # used to send lines to the remote IRCd by skipping the queue
385     # message: IRC message to send
386     # it should only be used for stuff that *must not* be queued,
387     # i.e. the initial PASS, NICK and USER command
388     # or the final QUIT message
389     def emergency_puts(message)
390       @qmutex.synchronize do
391         # debug "In puts - got mutex"
392         puts_critical(message)
393       end
394     end
395
396     def handle_socket_error(string, e)
397       error "#{string} failed: #{e.pretty_inspect}"
398       # We assume that an error means that there are connection
399       # problems and that we should reconnect, so we
400       shutdown
401       raise SocketError.new(e.inspect)
402     end
403
404     # get the next line from the server (blocks)
405     def gets
406       if @sock.nil?
407         warning "socket get attempted while closed"
408         return nil
409       end
410       begin
411         reply = @filter.in(@sock.gets)
412         @lines_received += 1
413         reply.strip! if reply
414         debug "RECV: #{reply.inspect}"
415         return reply
416       rescue Exception => e
417         handle_socket_error(:RECV, e)
418       end
419     end
420
421     def queue(msg, chan=nil, ring=0)
422       if @sendq_delay > 0
423         @qmutex.synchronize do
424           @sendq.push msg, chan, ring
425           @timer.unblock(@act_id)
426         end
427       else
428         # just send it if queueing is disabled
429         self.emergency_puts(msg)
430       end
431     end
432
433     # pop a message off the queue, send it
434     def spool
435       @qmutex.synchronize do
436         begin
437           debug "in spooler"
438           if @sendq.empty?
439             @timer.block(@act_id)
440             return
441           end
442           now = Time.new
443           if (now >= (@last_send + @sendq_delay))
444             debug "resetting @burst"
445             @burst = 0
446           elsif (@burst > @sendq_burst)
447             # nope. can't send anything, come back to us next tick...
448             debug "can't send yet"
449             @timer.unblock(@act_id)
450             return
451           end
452           @flood_send = now if @flood_send < now
453           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
454           while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
455             debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
456             puts_critical(@sendq.shift, true)
457           end
458           if @sendq.empty?
459             @timer.block(@act_id)
460           end
461         rescue Exception => e
462           error "Spooling failed: #{e.pretty_inspect}"
463           raise e
464         end
465       end
466     end
467
468     def clearq
469       if @sock
470         @qmutex.synchronize do
471           unless @sendq.empty?
472             @sendq.clear
473           end
474         end
475       else
476         warning "Clearing socket while disconnected"
477       end
478     end
479
480     # flush the TCPSocket
481     def flush
482       @sock.flush
483     end
484
485     # Wraps Kernel.select on the socket
486     def select(timeout=nil)
487       Kernel.select([@sock], nil, nil, timeout)
488     end
489
490     # shutdown the connection to the server
491     def shutdown(how=2)
492       return unless connected?
493       begin
494         @sock.close
495       rescue Exception => e
496         error "error while shutting down: #{e.pretty_inspect}"
497       end
498       @rawsock = nil if @ssl
499       @sock = nil
500       @burst = 0
501       @sendq.clear
502     end
503
504     private
505
506     # same as puts, but expects to be called with a mutex held on @qmutex
507     def puts_critical(message, penalty=false)
508       # debug "in puts_critical"
509       begin
510         debug "SEND: #{message.inspect}"
511         if @sock.nil?
512           error "SEND attempted on closed socket"
513         else
514           @sock.puts(@filter.out(message))
515           @last_send = Time.new
516           @flood_send += message.irc_send_penalty if penalty
517           @lines_sent += 1
518           @burst += 1
519         end
520       rescue Exception => e
521         handle_socket_error(:SEND, e)
522       end
523     end
524
525   end
526
527 end