]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
Socket IO filtering: rbot can now assume UTF-8 internally.
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 class ::String
2   # Calculate the penalty which will be assigned to this message
3   # by the IRCd
4   def irc_send_penalty
5     # According to eggrdop, the initial penalty is
6     penalty = 1 + self.size/100
7     # on everything but UnderNET where it's
8     # penalty = 2 + self.size/120
9
10     cmd, pars = self.split($;,2)
11     debug "cmd: #{cmd}, pars: #{pars.inspect}"
12     case cmd.to_sym
13     when :KICK
14       chan, nick, msg = pars.split
15       chan = chan.split(',')
16       nick = nick.split(',')
17       penalty += nick.size
18       penalty *= chan.size
19     when :MODE
20       chan, modes, argument = pars.split
21       extra = 0
22       if modes
23         extra = 1
24         if argument
25           extra += modes.split(/\+|-/).size
26         else
27           extra += 3 * modes.split(/\+|-/).size
28         end
29       end
30       if argument
31         extra += 2 * argument.split.size
32       end
33       penalty += extra * chan.split.size
34     when :TOPIC
35       penalty += 1
36       penalty += 2 unless pars.split.size < 2
37     when :PRIVMSG, :NOTICE
38       dests = pars.split($;,2).first
39       penalty += dests.split(',').size
40     when :WHO
41       # I'm too lazy to implement this one correctly
42       penalty += 5
43     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
44       penalty += 2
45     when :INVITE, :NICK
46       penalty += 3
47     when :ISON
48       penalty += 1
49     else # Unknown messages
50       penalty += 1
51     end
52     if penalty > 99
53       debug "Wow, more than 99 secs of penalty!"
54       penalty = 99
55     end
56     if penalty < 2
57       debug "Wow, less than 2 secs of penalty!"
58       penalty = 2
59     end
60     debug "penalty: #{penalty}"
61     return penalty
62   end
63 end
64
65 module Irc
66
67   require 'socket'
68   require 'thread'
69   require 'rbot/timer'
70
71   class QueueRing
72     # A QueueRing is implemented as an array with elements in the form
73     # [chan, [message1, message2, ...]
74     # Note that the channel +chan+ has no actual bearing with the channels
75     # to which messages will be sent
76
77     def initialize
78       @storage = Array.new
79       @last_idx = -1
80     end
81
82     def clear
83       @storage.clear
84       @last_idx = -1
85     end
86
87     def length
88       len = 0
89       @storage.each {|c|
90         len += c[1].size
91       }
92       return len
93     end
94     alias :size :length
95
96     def empty?
97       @storage.empty?
98     end
99
100     def push(mess, chan)
101       cmess = @storage.assoc(chan)
102       if cmess
103         idx = @storage.index(cmess)
104         cmess[1] << mess
105         @storage[idx] = cmess
106       else
107         @storage << [chan, [mess]]
108       end
109     end
110
111     def next
112       if empty?
113         warning "trying to access empty ring"
114         return nil
115       end
116       save_idx = @last_idx
117       @last_idx = (@last_idx + 1) % @storage.size
118       mess = @storage[@last_idx][1].first
119       @last_idx = save_idx
120       return mess
121     end
122
123     def shift
124       if empty?
125         warning "trying to access empty ring"
126         return nil
127       end
128       @last_idx = (@last_idx + 1) % @storage.size
129       mess = @storage[@last_idx][1].shift
130       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
131       return mess
132     end
133
134   end
135
136   class MessageQueue
137     def initialize
138       # a MessageQueue is an array of QueueRings
139       # rings have decreasing priority, so messages in ring 0
140       # are more important than messages in ring 1, and so on
141       @rings = Array.new(3) { |i|
142         if i > 0
143           QueueRing.new
144         else
145           # ring 0 is special in that if it's not empty, it will
146           # be popped. IOW, ring 0 can starve the other rings
147           # ring 0 is strictly FIFO and is therefore implemented
148           # as an array
149           Array.new
150         end
151       }
152       # the other rings are satisfied round-robin
153       @last_ring = 0
154     end
155
156     def clear
157       @rings.each { |r|
158         r.clear
159       }
160       @last_ring = 0
161     end
162
163     def push(mess, chan=nil, cring=0)
164       ring = cring
165       if ring == 0
166         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
167         @rings[0] << mess
168       else
169         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
170         @rings[ring].push mess, chan
171       end
172     end
173
174     def empty?
175       @rings.each { |r|
176         return false unless r.empty?
177       }
178       return true
179     end
180
181     def length
182       len = 0
183       @rings.each { |r|
184         len += r.size
185       }
186       len
187     end
188     alias :size :length
189
190     def next
191       if empty?
192         warning "trying to access empty ring"
193         return nil
194       end
195       mess = nil
196       if !@rings[0].empty?
197         mess = @rings[0].first
198       else
199         save_ring = @last_ring
200         (@rings.size - 1).times {
201           @last_ring = (@last_ring % (@rings.size - 1)) + 1
202           if !@rings[@last_ring].empty?
203             mess = @rings[@last_ring].next
204             break
205           end
206         }
207         @last_ring = save_ring
208       end
209       error "nil message" if mess.nil?
210       return mess
211     end
212
213     def shift
214       if empty?
215         warning "trying to access empty ring"
216         return nil
217       end
218       mess = nil
219       if !@rings[0].empty?
220         return @rings[0].shift
221       end
222       (@rings.size - 1).times {
223         @last_ring = (@last_ring % (@rings.size - 1)) + 1
224         if !@rings[@last_ring].empty?
225           return @rings[@last_ring].shift
226         end
227       }
228       error "nil message" if mess.nil?
229       return mess
230     end
231
232   end
233
234   # wrapped TCPSocket for communication with the server.
235   # emulates a subset of TCPSocket functionality
236   class IrcSocket
237
238     MAX_IRC_SEND_PENALTY = 10
239
240     # total number of lines sent to the irc server
241     attr_reader :lines_sent
242
243     # total number of lines received from the irc server
244     attr_reader :lines_received
245
246     # total number of bytes sent to the irc server
247     attr_reader :bytes_sent
248
249     # total number of bytes received from the irc server
250     attr_reader :bytes_received
251
252     # accumulator for the throttle
253     attr_reader :throttle_bytes
254
255     # delay between lines sent
256     attr_reader :sendq_delay
257
258     # max lines to burst
259     attr_reader :sendq_burst
260
261     # an optional filter object. we call @filter.in(data) for
262     # all incoming data and @filter.out(data) for all outgoing data
263     attr_reader :filter
264
265     # default trivial filter class
266     class IdentityFilter
267         def in(x)
268             x
269         end
270
271         def out(x)
272             x
273         end
274     end
275
276     # set filter to identity, not to nil
277     def filter=(f)
278         @filter = f || IdentityFilter.new
279     end
280
281     # server:: server to connect to
282     # port::   IRCd port
283     # host::   optional local host to bind to (ruby 1.7+ required)
284     # create a new IrcSocket
285     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, opts={})
286       @timer = Timer::Timer.new
287       @timer.add(0.2) do
288         spool
289       end
290       @server = server.dup
291       @port = port.to_i
292       @host = host
293       @sock = nil
294       @filter = IdentityFilter.new
295       @spooler = false
296       @lines_sent = 0
297       @lines_received = 0
298       if opts.kind_of?(Hash) and opts.key?(:ssl)
299         @ssl = opts[:ssl]
300       else
301         @ssl = false
302       end
303
304       if sendq_delay
305         @sendq_delay = sendq_delay.to_f
306       else
307         @sendq_delay = 2
308       end
309       @last_send = Time.new - @sendq_delay
310       @flood_send = Time.new
311       @last_throttle = Time.new
312       @burst = 0
313       if sendq_burst
314         @sendq_burst = sendq_burst.to_i
315       else
316         @sendq_burst = 4
317       end
318     end
319
320     def connected?
321       !@sock.nil?
322     end
323
324     # open a TCP connection to the server
325     def connect
326       if connected?
327         warning "reconnecting while connected"
328         return
329       end
330       if(@host)
331         begin
332           @sock=TCPSocket.new(@server, @port, @host)
333         rescue ArgumentError => e
334           error "Your version of ruby does not support binding to a "
335           error "specific local address, please upgrade if you wish "
336           error "to use HOST = foo"
337           error "(this option has been disabled in order to continue)"
338           @sock=TCPSocket.new(@server, @port)
339         end
340       else
341         @sock=TCPSocket.new(@server, @port)
342       end
343       if(@ssl)
344         require 'openssl'
345         ssl_context = OpenSSL::SSL::SSLContext.new()
346         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
347         @rawsock = @sock
348         @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
349         @sock.sync_close = true
350         @sock.connect
351       end
352       @qthread = false
353       @qmutex = Mutex.new
354       @sendq = MessageQueue.new
355     end
356
357     def sendq_delay=(newfreq)
358       debug "changing sendq frequency to #{newfreq}"
359       @qmutex.synchronize do
360         @sendq_delay = newfreq
361         if newfreq == 0
362           clearq
363           @timer.stop
364         else
365           @timer.start
366         end
367       end
368     end
369
370     def sendq_burst=(newburst)
371       @qmutex.synchronize do
372         @sendq_burst = newburst
373       end
374     end
375
376     # used to send lines to the remote IRCd by skipping the queue
377     # message: IRC message to send
378     # it should only be used for stuff that *must not* be queued,
379     # i.e. the initial PASS, NICK and USER command
380     # or the final QUIT message
381     def emergency_puts(message)
382       @qmutex.synchronize do
383         # debug "In puts - got mutex"
384         puts_critical(message)
385       end
386     end
387
388     def handle_socket_error(string, err)
389       error "#{string} failed: #{err.inspect}"
390       debug err.backtrace.join("\n")
391       # We assume that an error means that there are connection
392       # problems and that we should reconnect, so we
393       shutdown
394       raise SocketError.new(err.inspect)
395     end
396
397     # get the next line from the server (blocks)
398     def gets
399       if @sock.nil?
400         warning "socket get attempted while closed"
401         return nil
402       end
403       begin
404         reply = @filter.in(@sock.gets)
405         @lines_received += 1
406         reply.strip! if reply
407         debug "RECV: #{reply.inspect}"
408         return reply
409       rescue => e
410         handle_socket_error(:RECV, e)
411       end
412     end
413
414     def queue(msg, chan=nil, ring=0)
415       if @sendq_delay > 0
416         @qmutex.synchronize do
417           @sendq.push msg, chan, ring
418           @timer.start
419         end
420       else
421         # just send it if queueing is disabled
422         self.emergency_puts(msg)
423       end
424     end
425
426     # pop a message off the queue, send it
427     def spool
428       @qmutex.synchronize do
429         begin
430           debug "in spooler"
431           if @sendq.empty?
432             @timer.stop
433             return
434           end
435           now = Time.new
436           if (now >= (@last_send + @sendq_delay))
437             debug "resetting @burst"
438             @burst = 0
439           elsif (@burst > @sendq_burst)
440             # nope. can't send anything, come back to us next tick...
441             debug "can't send yet"
442             @timer.start
443             return
444           end
445           @flood_send = now if @flood_send < now
446           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
447           while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
448             debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
449             puts_critical(@sendq.shift, true)
450           end
451           if @sendq.empty?
452             @timer.stop
453           end
454         rescue => e
455           error "Spooling failed: #{e.inspect}"
456           error e.backtrace.join("\n")
457         end
458       end
459     end
460
461     def clearq
462       if @sock
463         @qmutex.synchronize do
464           unless @sendq.empty?
465             @sendq.clear
466           end
467         end
468       else
469         warning "Clearing socket while disconnected"
470       end
471     end
472
473     # flush the TCPSocket
474     def flush
475       @sock.flush
476     end
477
478     # Wraps Kernel.select on the socket
479     def select(timeout=nil)
480       Kernel.select([@sock], nil, nil, timeout)
481     end
482
483     # shutdown the connection to the server
484     def shutdown(how=2)
485       return unless connected?
486       begin
487         @sock.close
488       rescue => err
489         error "error while shutting down: #{err.inspect}"
490         debug err.backtrace.join("\n")
491       end
492       @rawsock = nil if @ssl
493       @sock = nil
494       @burst = 0
495     end
496
497     private
498
499     # same as puts, but expects to be called with a mutex held on @qmutex
500     def puts_critical(message, penalty=false)
501       # debug "in puts_critical"
502       begin
503         debug "SEND: #{message.inspect}"
504         if @sock.nil?
505           error "SEND attempted on closed socket"
506         else
507           @sock.puts(@filter.out(message))
508           @last_send = Time.new
509           @flood_send += message.irc_send_penalty if penalty
510           @lines_sent += 1
511           @burst += 1
512         end
513       rescue => e
514         handle_socket_error(:SEND, e)
515       end
516     end
517
518   end
519
520 end