]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
ccc751f7cbccef7b096e782cbb7430a092d7a25b
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   class QueueRing
8     # A QueueRing is implemented as an array with elements in the form
9     # [chan, [message1, message2, ...]
10     # Note that the channel +chan+ has no actual bearing with the channels
11     # to which messages will be sent
12
13     def initialize
14       @storage = Array.new
15       @last_idx = -1
16     end
17
18     def clear
19       @storage.clear
20       @last_idx = -1
21     end
22
23     def length
24       length = 0
25       @storage.each {|c|
26         length += c[1].length 
27       }
28       return length
29     end
30
31     def empty?
32       @storage.empty?
33     end
34
35     def push(mess, chan)
36       cmess = @storage.assoc(chan)
37       if cmess
38         idx = @storage.index(cmess)
39         cmess[1] << mess
40         @storage[idx] = cmess
41       else
42         @storage << [chan, [mess]]
43       end
44     end
45
46     def next
47       if empty?
48         warning "trying to access empty ring"
49         return nil
50       end
51       save_idx = @last_idx
52       @last_idx = (@last_idx + 1) % @storage.length
53       mess = @storage[@last_idx][1].first
54       @last_idx = save_idx
55       return mess
56     end
57
58     def shift
59       if empty?
60         warning "trying to access empty ring"
61         return nil
62       end
63       @last_idx = (@last_idx + 1) % @storage.length
64       mess = @storage[@last_idx][1].shift
65       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
66       return mess
67     end
68
69   end
70
71   class MessageQueue
72     def initialize
73       # a MessageQueue is an array of QueueRings
74       # rings have decreasing priority, so messages in ring 0
75       # are more important than messages in ring 1, and so on
76       @rings = Array.new(3) { |i|
77         if i > 0
78           QueueRing.new
79         else
80           # ring 0 is special in that if it's not empty, it will
81           # be popped. IOW, ring 0 can starve the other rings
82           # ring 0 is strictly FIFO and is therefore implemented
83           # as an array
84           Array.new
85         end
86       }
87       # the other rings are satisfied round-robin
88       @last_ring = 0
89     end
90
91     def clear
92       @rings.each { |r|
93         r.clear
94       }
95       @last_ring = 0
96     end
97
98     def push(mess, chan=nil, cring=0)
99       ring = cring
100       if ring == 0
101         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
102         @rings[0] << mess
103       else
104         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
105         @rings[ring].push mess, chan
106       end
107     end
108
109     def empty?
110       @rings.each { |r|
111         return false unless r.empty?
112       }
113       return true
114     end
115
116     def length
117       len = 0
118       @rings.each { |r|
119         len += r.length
120       }
121       len
122     end
123
124     def next
125       if empty?
126         warning "trying to access empty ring"
127         return nil
128       end
129       mess = nil
130       if !@rings[0].empty?
131         mess = @rings[0].first
132       else
133         save_ring = @last_ring
134         (@rings.length - 1).times {
135           @last_ring = (@last_ring % (@rings.length - 1)) + 1
136           if !@rings[@last_ring].empty?
137             mess = @rings[@last_ring].next
138             break
139           end
140         }
141         @last_ring = save_ring
142       end
143       error "nil message" if mess.nil?
144       return mess
145     end
146
147     def shift
148       if empty?
149         warning "trying to access empty ring"
150         return nil
151       end
152       mess = nil
153       if !@rings[0].empty?
154         return @rings[0].shift
155       end
156       (@rings.length - 1).times {
157         @last_ring = (@last_ring % (@rings.length - 1)) + 1
158         if !@rings[@last_ring].empty?
159           return @rings[@last_ring].shift
160         end
161       }
162       error "nil message" if mess.nil?
163       return mess
164     end
165
166   end
167
168   # wrapped TCPSocket for communication with the server.
169   # emulates a subset of TCPSocket functionality
170   class IrcSocket
171     # total number of lines sent to the irc server
172     attr_reader :lines_sent
173
174     # total number of lines received from the irc server
175     attr_reader :lines_received
176
177     # total number of bytes sent to the irc server
178     attr_reader :bytes_sent
179
180     # total number of bytes received from the irc server
181     attr_reader :bytes_received
182
183     # accumulator for the throttle
184     attr_reader :throttle_bytes
185
186     # byterate components
187     attr_reader :bytes_per
188     attr_reader :seconds_per
189
190     # delay between lines sent
191     attr_reader :sendq_delay
192
193     # max lines to burst
194     attr_reader :sendq_burst
195
196     # server:: server to connect to
197     # port::   IRCd port
198     # host::   optional local host to bind to (ruby 1.7+ required)
199     # create a new IrcSocket
200     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
201       @timer = Timer::Timer.new
202       @timer.add(0.2) do
203         spool
204       end
205       @server = server.dup
206       @port = port.to_i
207       @host = host
208       @sock = nil
209       @spooler = false
210       @lines_sent = 0
211       @lines_received = 0
212       if sendq_delay
213         @sendq_delay = sendq_delay.to_f
214       else
215         @sendq_delay = 2
216       end
217       @last_send = Time.new - @sendq_delay
218       @last_throttle = Time.new
219       @burst = 0
220       if sendq_burst
221         @sendq_burst = sendq_burst.to_i
222       else
223         @sendq_burst = 4
224       end
225       @bytes_per = 400
226       @seconds_per = 2
227       @throttle_bytes = 0
228       @hit_limit = 0 # how many times did we reach the limit?
229       setbyterate(brt)
230     end
231
232     def setbyterate(brt)
233       if brt.match(/(\d+)\/(\d)/)
234         @bytes_per = $1.to_i
235         @seconds_per = $2.to_i
236         debug "Byterate now #{byterate}"
237         return true
238       else
239         debug "Couldn't set byterate #{brt}"
240         return false
241       end
242     end
243
244     def connected?
245       !@sock.nil?
246     end
247
248     # open a TCP connection to the server
249     def connect
250       if connected?
251         warning "reconnecting while connected"
252         return
253       end
254       if(@host)
255         begin
256           @sock=TCPSocket.new(@server, @port, @host)
257         rescue ArgumentError => e
258           error "Your version of ruby does not support binding to a "
259           error "specific local address, please upgrade if you wish "
260           error "to use HOST = foo"
261           error "(this option has been disabled in order to continue)"
262           @sock=TCPSocket.new(@server, @port)
263         end
264       else
265         @sock=TCPSocket.new(@server, @port)
266       end
267       @qthread = false
268       @qmutex = Mutex.new
269       @sendq = MessageQueue.new
270     end
271
272     def sendq_delay=(newfreq)
273       debug "changing sendq frequency to #{newfreq}"
274       @qmutex.synchronize do
275         @sendq_delay = newfreq
276         if newfreq == 0
277           clearq
278           @timer.stop
279         else
280           @timer.start
281         end
282       end
283     end
284
285     def sendq_burst=(newburst)
286       @qmutex.synchronize do
287         @sendq_burst = newburst
288       end
289     end
290
291     def byterate
292       return "#{@bytes_per}/#{@seconds_per} (limit hit #{@hit_limit} times)"
293     end
294
295     def byterate=(newrate)
296       @qmutex.synchronize do
297         setbyterate(newrate)
298       end
299     end
300
301     def run_throttle(more=0)
302       now = Time.new
303       # Each time we reach the limit, we reduce the bitrate. We reset the bitrate only if the throttle
304       # manages to reset twice. This way we have better flood control, although the really perfect way
305       # would be to calculate our penalty the way it's done serverside.
306       if @throttle_bytes > 0
307         if @throttle_bytes >= @bytes_per
308           @hit_limit += 1
309           @hit_limit = 3 if @hit_limit > 3
310         end
311         delta = ((now - @last_throttle)*(0.5**@hit_limit.ceil)*@bytes_per/@seconds_per).floor
312         if delta > 0
313           @throttle_bytes -= delta
314           @throttle_bytes = 0 if @throttle_bytes < 0
315           @last_throttle = now
316         end
317       else
318         @hit_limit -= 0.5 if @hit_limit > 0
319       end
320       @throttle_bytes += more
321     end
322
323     # used to send lines to the remote IRCd by skipping the queue
324     # message: IRC message to send
325     # it should only be used for stuff that *must not* be queued,
326     # i.e. the initial PASS, NICK and USER command
327     # or the final QUIT message
328     def emergency_puts(message)
329       @qmutex.synchronize do
330         # debug "In puts - got mutex"
331         puts_critical(message)
332       end
333     end
334
335     # get the next line from the server (blocks)
336     def gets
337       if @sock.nil?
338         warning "socket get attempted while closed"
339         return nil
340       end
341       begin
342         reply = @sock.gets
343         @lines_received += 1
344         reply.strip! if reply
345         debug "RECV: #{reply.inspect}"
346         return reply
347       rescue => e
348         warning "socket get failed: #{e.inspect}"
349         debug e.backtrace.join("\n")
350         return nil
351       end
352     end
353
354     def queue(msg, chan=nil, ring=0)
355       if @sendq_delay > 0
356         @qmutex.synchronize do
357           @sendq.push msg, chan, ring
358           @timer.start
359         end
360       else
361         # just send it if queueing is disabled
362         self.emergency_puts(msg)
363       end
364     end
365
366     # pop a message off the queue, send it
367     def spool
368       @qmutex.synchronize do
369         begin
370           debug "in spooler"
371           if @sendq.empty?
372             @timer.stop
373             return
374           end
375           now = Time.new
376           if (now >= (@last_send + @sendq_delay))
377             # after @sendq_delay has passed, we allow more @burst
378             # instead of resetting it to 0, we reduce it by 1
379             debug "decreasing @burst"
380             @burst -= 1 if @burst > 0
381           elsif (@burst >= @sendq_burst)
382             # nope. can't send anything, come back to us next tick...
383             debug "can't send yet"
384             @timer.start
385             return
386           end
387           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
388           (@sendq_burst - @burst).times do
389             break if @sendq.empty?
390             mess = @sendq.next
391             if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
392               debug "flood protection: sending message of length #{mess.length}"
393               debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
394               puts_critical(@sendq.shift)
395             else
396               debug "flood protection: throttling message of length #{mess.length}"
397               debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
398               run_throttle
399               break
400             end
401           end
402           if @sendq.empty?
403             @timer.stop
404           end
405         rescue => e
406           error "Spooling failed: #{e.inspect}"
407           error e.backtrace.join("\n")
408         end
409       end
410     end
411
412     def clearq
413       if @sock
414         @qmutex.synchronize do
415           unless @sendq.empty?
416             @sendq.clear
417           end
418         end
419       else
420         warning "Clearing socket while disconnected"
421       end
422     end
423
424     # flush the TCPSocket
425     def flush
426       @sock.flush
427     end
428
429     # Wraps Kernel.select on the socket
430     def select(timeout=nil)
431       Kernel.select([@sock], nil, nil, timeout)
432     end
433
434     # shutdown the connection to the server
435     def shutdown(how=2)
436       @sock.shutdown(how) unless @sock.nil?
437       @sock = nil
438       @burst = 0
439     end
440
441     private
442
443     # same as puts, but expects to be called with a mutex held on @qmutex
444     def puts_critical(message)
445       # debug "in puts_critical"
446       begin
447         debug "SEND: #{message.inspect}"
448         if @sock.nil?
449           error "SEND attempted on closed socket"
450         else
451           @sock.send(message + "\n",0)
452           @last_send = Time.new
453           @lines_sent += 1
454           @burst += 1
455           run_throttle(message.length + 1)
456         end
457       rescue => e
458         error "SEND failed: #{e.inspect}"
459         raise
460       end
461     end
462
463   end
464
465 end