]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
sendmsg improvements: plugins can now choose what to do with overlong messages withou...
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 class ::String
2   # Calculate the penalty which will be assigned to this message
3   # by the IRCd
4   def irc_send_penalty
5     # According to eggrdop, the initial penalty is
6     penalty = 1 + self.length/100
7     # on everything but UnderNET where it's
8     # penalty = 2 + self.length/120
9
10     cmd, pars = self.split($;,2)
11     debug "cmd: #{cmd}, pars: #{pars.inspect}"
12     case cmd.to_sym
13     when :KICK
14       chan, nick, msg = pars.split
15       chan = chan.split(',')
16       nick = nick.split(',')
17       penalty += nick.length
18       penalty *= chan.length
19     when :MODE
20       chan, modes, argument = pars.split
21       extra = 0
22       if modes
23         extra = 1
24         if argument
25           extra += modes.split(/\+|-/).length
26         else
27           extra += 3 * modes.split(/\+|-/).length
28         end
29       end
30       if argument
31         extra += 2 * argument.split.length
32       end
33       penalty += extra * chan.split.length
34     when :TOPIC
35       penalty += 1
36       penalty += 2 unless pars.split.length < 2
37     when :PRIVMSG, :NOTICE
38       dests = pars.split($;,2).first
39       penalty += dests.split(',').length
40     when :WHO
41       # I'm too lazy to implement this one correctly
42       penalty += 5
43     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
44       penalty += 2
45     when :INVITE, :NICK
46       penalty += 3
47     when :ISON
48       penalty += 1
49     else # Unknown messages
50       penalty += 1
51     end
52     if penalty > 99
53       debug "Wow, more than 99 secs of penalty!"
54       penalty = 99
55     end
56     if penalty < 2
57       debug "Wow, less than 2 secs of penalty!"
58       penalty = 2
59     end
60     debug "penalty: #{penalty}"
61     return penalty
62   end
63 end
64
65 module Irc
66
67   require 'socket'
68   require 'thread'
69   require 'rbot/timer'
70
71   class QueueRing
72     # A QueueRing is implemented as an array with elements in the form
73     # [chan, [message1, message2, ...]
74     # Note that the channel +chan+ has no actual bearing with the channels
75     # to which messages will be sent
76
77     def initialize
78       @storage = Array.new
79       @last_idx = -1
80     end
81
82     def clear
83       @storage.clear
84       @last_idx = -1
85     end
86
87     def length
88       length = 0
89       @storage.each {|c|
90         length += c[1].length 
91       }
92       return length
93     end
94
95     def empty?
96       @storage.empty?
97     end
98
99     def push(mess, chan)
100       cmess = @storage.assoc(chan)
101       if cmess
102         idx = @storage.index(cmess)
103         cmess[1] << mess
104         @storage[idx] = cmess
105       else
106         @storage << [chan, [mess]]
107       end
108     end
109
110     def next
111       if empty?
112         warning "trying to access empty ring"
113         return nil
114       end
115       save_idx = @last_idx
116       @last_idx = (@last_idx + 1) % @storage.length
117       mess = @storage[@last_idx][1].first
118       @last_idx = save_idx
119       return mess
120     end
121
122     def shift
123       if empty?
124         warning "trying to access empty ring"
125         return nil
126       end
127       @last_idx = (@last_idx + 1) % @storage.length
128       mess = @storage[@last_idx][1].shift
129       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
130       return mess
131     end
132
133   end
134
135   class MessageQueue
136     def initialize
137       # a MessageQueue is an array of QueueRings
138       # rings have decreasing priority, so messages in ring 0
139       # are more important than messages in ring 1, and so on
140       @rings = Array.new(3) { |i|
141         if i > 0
142           QueueRing.new
143         else
144           # ring 0 is special in that if it's not empty, it will
145           # be popped. IOW, ring 0 can starve the other rings
146           # ring 0 is strictly FIFO and is therefore implemented
147           # as an array
148           Array.new
149         end
150       }
151       # the other rings are satisfied round-robin
152       @last_ring = 0
153     end
154
155     def clear
156       @rings.each { |r|
157         r.clear
158       }
159       @last_ring = 0
160     end
161
162     def push(mess, chan=nil, cring=0)
163       ring = cring
164       if ring == 0
165         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
166         @rings[0] << mess
167       else
168         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
169         @rings[ring].push mess, chan
170       end
171     end
172
173     def empty?
174       @rings.each { |r|
175         return false unless r.empty?
176       }
177       return true
178     end
179
180     def length
181       len = 0
182       @rings.each { |r|
183         len += r.length
184       }
185       len
186     end
187
188     def next
189       if empty?
190         warning "trying to access empty ring"
191         return nil
192       end
193       mess = nil
194       if !@rings[0].empty?
195         mess = @rings[0].first
196       else
197         save_ring = @last_ring
198         (@rings.length - 1).times {
199           @last_ring = (@last_ring % (@rings.length - 1)) + 1
200           if !@rings[@last_ring].empty?
201             mess = @rings[@last_ring].next
202             break
203           end
204         }
205         @last_ring = save_ring
206       end
207       error "nil message" if mess.nil?
208       return mess
209     end
210
211     def shift
212       if empty?
213         warning "trying to access empty ring"
214         return nil
215       end
216       mess = nil
217       if !@rings[0].empty?
218         return @rings[0].shift
219       end
220       (@rings.length - 1).times {
221         @last_ring = (@last_ring % (@rings.length - 1)) + 1
222         if !@rings[@last_ring].empty?
223           return @rings[@last_ring].shift
224         end
225       }
226       error "nil message" if mess.nil?
227       return mess
228     end
229
230   end
231
232   # wrapped TCPSocket for communication with the server.
233   # emulates a subset of TCPSocket functionality
234   class IrcSocket
235
236     MAX_IRC_SEND_PENALTY = 10
237
238     # total number of lines sent to the irc server
239     attr_reader :lines_sent
240
241     # total number of lines received from the irc server
242     attr_reader :lines_received
243
244     # total number of bytes sent to the irc server
245     attr_reader :bytes_sent
246
247     # total number of bytes received from the irc server
248     attr_reader :bytes_received
249
250     # accumulator for the throttle
251     attr_reader :throttle_bytes
252
253     # delay between lines sent
254     attr_reader :sendq_delay
255
256     # max lines to burst
257     attr_reader :sendq_burst
258
259     # server:: server to connect to
260     # port::   IRCd port
261     # host::   optional local host to bind to (ruby 1.7+ required)
262     # create a new IrcSocket
263     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, opts={})
264       @timer = Timer::Timer.new
265       @timer.add(0.2) do
266         spool
267       end
268       @server = server.dup
269       @port = port.to_i
270       @host = host
271       @sock = nil
272       @spooler = false
273       @lines_sent = 0
274       @lines_received = 0
275       if opts.kind_of?(Hash) and opts.key?(:ssl)
276         @ssl = opts[:ssl]
277       else
278         @ssl = false
279       end
280
281       if sendq_delay
282         @sendq_delay = sendq_delay.to_f
283       else
284         @sendq_delay = 2
285       end
286       @last_send = Time.new - @sendq_delay
287       @flood_send = Time.new
288       @last_throttle = Time.new
289       @burst = 0
290       if sendq_burst
291         @sendq_burst = sendq_burst.to_i
292       else
293         @sendq_burst = 4
294       end
295     end
296
297     def connected?
298       !@sock.nil?
299     end
300
301     # open a TCP connection to the server
302     def connect
303       if connected?
304         warning "reconnecting while connected"
305         return
306       end
307       if(@host)
308         begin
309           @sock=TCPSocket.new(@server, @port, @host)
310         rescue ArgumentError => e
311           error "Your version of ruby does not support binding to a "
312           error "specific local address, please upgrade if you wish "
313           error "to use HOST = foo"
314           error "(this option has been disabled in order to continue)"
315           @sock=TCPSocket.new(@server, @port)
316         end
317       else
318         @sock=TCPSocket.new(@server, @port)
319       end
320       if(@ssl)
321         require 'openssl'
322         ssl_context = OpenSSL::SSL::SSLContext.new()
323         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
324         @rawsock = @sock
325         @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
326         @sock.sync_close = true
327         @sock.connect
328       end
329       @qthread = false
330       @qmutex = Mutex.new
331       @sendq = MessageQueue.new
332     end
333
334     def sendq_delay=(newfreq)
335       debug "changing sendq frequency to #{newfreq}"
336       @qmutex.synchronize do
337         @sendq_delay = newfreq
338         if newfreq == 0
339           clearq
340           @timer.stop
341         else
342           @timer.start
343         end
344       end
345     end
346
347     def sendq_burst=(newburst)
348       @qmutex.synchronize do
349         @sendq_burst = newburst
350       end
351     end
352
353     # used to send lines to the remote IRCd by skipping the queue
354     # message: IRC message to send
355     # it should only be used for stuff that *must not* be queued,
356     # i.e. the initial PASS, NICK and USER command
357     # or the final QUIT message
358     def emergency_puts(message)
359       @qmutex.synchronize do
360         # debug "In puts - got mutex"
361         puts_critical(message)
362       end
363     end
364
365     def handle_socket_error(string, err)
366       error "#{string} failed: #{err.inspect}"
367       debug err.backtrace.join("\n")
368       # We assume that an error means that there are connection
369       # problems and that we should reconnect, so we
370       shutdown
371       raise SocketError.new(err.inspect)
372     end
373
374     # get the next line from the server (blocks)
375     def gets
376       if @sock.nil?
377         warning "socket get attempted while closed"
378         return nil
379       end
380       begin
381         reply = @sock.gets
382         @lines_received += 1
383         reply.strip! if reply
384         debug "RECV: #{reply.inspect}"
385         return reply
386       rescue => e
387         handle_socket_error(:RECV, e)
388       end
389     end
390
391     def queue(msg, chan=nil, ring=0)
392       if @sendq_delay > 0
393         @qmutex.synchronize do
394           @sendq.push msg, chan, ring
395           @timer.start
396         end
397       else
398         # just send it if queueing is disabled
399         self.emergency_puts(msg)
400       end
401     end
402
403     # pop a message off the queue, send it
404     def spool
405       @qmutex.synchronize do
406         begin
407           debug "in spooler"
408           if @sendq.empty?
409             @timer.stop
410             return
411           end
412           now = Time.new
413           if (now >= (@last_send + @sendq_delay))
414             debug "resetting @burst"
415             @burst = 0
416           elsif (@burst > @sendq_burst)
417             # nope. can't send anything, come back to us next tick...
418             debug "can't send yet"
419             @timer.start
420             return
421           end
422           @flood_send = now if @flood_send < now
423           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
424           while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
425             debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
426             puts_critical(@sendq.shift, true)
427           end
428           if @sendq.empty?
429             @timer.stop
430           end
431         rescue => e
432           error "Spooling failed: #{e.inspect}"
433           error e.backtrace.join("\n")
434         end
435       end
436     end
437
438     def clearq
439       if @sock
440         @qmutex.synchronize do
441           unless @sendq.empty?
442             @sendq.clear
443           end
444         end
445       else
446         warning "Clearing socket while disconnected"
447       end
448     end
449
450     # flush the TCPSocket
451     def flush
452       @sock.flush
453     end
454
455     # Wraps Kernel.select on the socket
456     def select(timeout=nil)
457       Kernel.select([@sock], nil, nil, timeout)
458     end
459
460     # shutdown the connection to the server
461     def shutdown(how=2)
462       return unless connected?
463       begin
464         @sock.close
465       rescue => err
466         error "error while shutting down: #{err.inspect}"
467         debug err.backtrace.join("\n")
468       end
469       @rawsock = nil if @ssl
470       @sock = nil
471       @burst = 0
472     end
473
474     private
475
476     # same as puts, but expects to be called with a mutex held on @qmutex
477     def puts_critical(message, penalty=false)
478       # debug "in puts_critical"
479       begin
480         debug "SEND: #{message.inspect}"
481         if @sock.nil?
482           error "SEND attempted on closed socket"
483         else
484           @sock.puts message
485           @last_send = Time.new
486           @flood_send += message.irc_send_penalty if penalty
487           @lines_sent += 1
488           @burst += 1
489         end
490       rescue => e
491         handle_socket_error(:SEND, e)
492       end
493     end
494
495   end
496
497 end