]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
Penalty-based flood protection
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   class QueueRing
8     # A QueueRing is implemented as an array with elements in the form
9     # [chan, [message1, message2, ...]
10     # Note that the channel +chan+ has no actual bearing with the channels
11     # to which messages will be sent
12
13     def initialize
14       @storage = Array.new
15       @last_idx = -1
16     end
17
18     def clear
19       @storage.clear
20       @last_idx = -1
21     end
22
23     def length
24       length = 0
25       @storage.each {|c|
26         length += c[1].length 
27       }
28       return length
29     end
30
31     def empty?
32       @storage.empty?
33     end
34
35     def push(mess, chan)
36       cmess = @storage.assoc(chan)
37       if cmess
38         idx = @storage.index(cmess)
39         cmess[1] << mess
40         @storage[idx] = cmess
41       else
42         @storage << [chan, [mess]]
43       end
44     end
45
46     def next
47       if empty?
48         warning "trying to access empty ring"
49         return nil
50       end
51       save_idx = @last_idx
52       @last_idx = (@last_idx + 1) % @storage.length
53       mess = @storage[@last_idx][1].first
54       @last_idx = save_idx
55       return mess
56     end
57
58     def shift
59       if empty?
60         warning "trying to access empty ring"
61         return nil
62       end
63       @last_idx = (@last_idx + 1) % @storage.length
64       mess = @storage[@last_idx][1].shift
65       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
66       return mess
67     end
68
69   end
70
71   class MessageQueue
72     def initialize
73       # a MessageQueue is an array of QueueRings
74       # rings have decreasing priority, so messages in ring 0
75       # are more important than messages in ring 1, and so on
76       @rings = Array.new(3) { |i|
77         if i > 0
78           QueueRing.new
79         else
80           # ring 0 is special in that if it's not empty, it will
81           # be popped. IOW, ring 0 can starve the other rings
82           # ring 0 is strictly FIFO and is therefore implemented
83           # as an array
84           Array.new
85         end
86       }
87       # the other rings are satisfied round-robin
88       @last_ring = 0
89     end
90
91     def clear
92       @rings.each { |r|
93         r.clear
94       }
95       @last_ring = 0
96     end
97
98     def push(mess, chan=nil, cring=0)
99       ring = cring
100       if ring == 0
101         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
102         @rings[0] << mess
103       else
104         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
105         @rings[ring].push mess, chan
106       end
107     end
108
109     def empty?
110       @rings.each { |r|
111         return false unless r.empty?
112       }
113       return true
114     end
115
116     def length
117       len = 0
118       @rings.each { |r|
119         len += r.length
120       }
121       len
122     end
123
124     def next
125       if empty?
126         warning "trying to access empty ring"
127         return nil
128       end
129       mess = nil
130       if !@rings[0].empty?
131         mess = @rings[0].first
132       else
133         save_ring = @last_ring
134         (@rings.length - 1).times {
135           @last_ring = (@last_ring % (@rings.length - 1)) + 1
136           if !@rings[@last_ring].empty?
137             mess = @rings[@last_ring].next
138             break
139           end
140         }
141         @last_ring = save_ring
142       end
143       error "nil message" if mess.nil?
144       return mess
145     end
146
147     def shift
148       if empty?
149         warning "trying to access empty ring"
150         return nil
151       end
152       mess = nil
153       if !@rings[0].empty?
154         return @rings[0].shift
155       end
156       (@rings.length - 1).times {
157         @last_ring = (@last_ring % (@rings.length - 1)) + 1
158         if !@rings[@last_ring].empty?
159           return @rings[@last_ring].shift
160         end
161       }
162       error "nil message" if mess.nil?
163       return mess
164     end
165
166   end
167
168   # wrapped TCPSocket for communication with the server.
169   # emulates a subset of TCPSocket functionality
170   class IrcSocket
171
172     MAX_IRC_SEND_PENALTY = 10
173
174     # total number of lines sent to the irc server
175     attr_reader :lines_sent
176
177     # total number of lines received from the irc server
178     attr_reader :lines_received
179
180     # total number of bytes sent to the irc server
181     attr_reader :bytes_sent
182
183     # total number of bytes received from the irc server
184     attr_reader :bytes_received
185
186     # accumulator for the throttle
187     attr_reader :throttle_bytes
188
189     # delay between lines sent
190     attr_reader :sendq_delay
191
192     # max lines to burst
193     attr_reader :sendq_burst
194
195     # server:: server to connect to
196     # port::   IRCd port
197     # host::   optional local host to bind to (ruby 1.7+ required)
198     # create a new IrcSocket
199     def initialize(server, port, host, sendq_delay=2, sendq_burst=4)
200       @timer = Timer::Timer.new
201       @timer.add(0.2) do
202         spool
203       end
204       @server = server.dup
205       @port = port.to_i
206       @host = host
207       @sock = nil
208       @spooler = false
209       @lines_sent = 0
210       @lines_received = 0
211       if sendq_delay
212         @sendq_delay = sendq_delay.to_f
213       else
214         @sendq_delay = 2
215       end
216       @last_send = Time.new - @sendq_delay
217       @last_throttle = Time.new
218       @burst = 0
219       if sendq_burst
220         @sendq_burst = sendq_burst.to_i
221       else
222         @sendq_burst = 4
223       end
224     end
225
226     def connected?
227       !@sock.nil?
228     end
229
230     # open a TCP connection to the server
231     def connect
232       if connected?
233         warning "reconnecting while connected"
234         return
235       end
236       if(@host)
237         begin
238           @sock=TCPSocket.new(@server, @port, @host)
239         rescue ArgumentError => e
240           error "Your version of ruby does not support binding to a "
241           error "specific local address, please upgrade if you wish "
242           error "to use HOST = foo"
243           error "(this option has been disabled in order to continue)"
244           @sock=TCPSocket.new(@server, @port)
245         end
246       else
247         @sock=TCPSocket.new(@server, @port)
248       end
249       @qthread = false
250       @qmutex = Mutex.new
251       @sendq = MessageQueue.new
252     end
253
254     def sendq_delay=(newfreq)
255       debug "changing sendq frequency to #{newfreq}"
256       @qmutex.synchronize do
257         @sendq_delay = newfreq
258         if newfreq == 0
259           clearq
260           @timer.stop
261         else
262           @timer.start
263         end
264       end
265     end
266
267     def sendq_burst=(newburst)
268       @qmutex.synchronize do
269         @sendq_burst = newburst
270       end
271     end
272
273     # used to send lines to the remote IRCd by skipping the queue
274     # message: IRC message to send
275     # it should only be used for stuff that *must not* be queued,
276     # i.e. the initial PASS, NICK and USER command
277     # or the final QUIT message
278     def emergency_puts(message)
279       @qmutex.synchronize do
280         # debug "In puts - got mutex"
281         puts_critical(message)
282       end
283     end
284
285     # get the next line from the server (blocks)
286     def gets
287       if @sock.nil?
288         warning "socket get attempted while closed"
289         return nil
290       end
291       begin
292         reply = @sock.gets
293         @lines_received += 1
294         reply.strip! if reply
295         debug "RECV: #{reply.inspect}"
296         return reply
297       rescue => e
298         warning "socket get failed: #{e.inspect}"
299         debug e.backtrace.join("\n")
300         return nil
301       end
302     end
303
304     def queue(msg, chan=nil, ring=0)
305       if @sendq_delay > 0
306         @qmutex.synchronize do
307           @sendq.push msg, chan, ring
308           @timer.start
309         end
310       else
311         # just send it if queueing is disabled
312         self.emergency_puts(msg)
313       end
314     end
315
316     # pop a message off the queue, send it
317     def spool
318       @qmutex.synchronize do
319         begin
320           debug "in spooler"
321           if @sendq.empty?
322             @timer.stop
323             return
324           end
325           now = Time.new
326           if (now >= (@last_send + @sendq_delay))
327             # after @sendq_delay has passed, we allow more @burst
328             # instead of resetting it to 0, we reduce it by 1
329             debug "decreasing @burst"
330             @burst -= 1 if @burst > 0
331           elsif (@burst >= @sendq_burst)
332             # nope. can't send anything, come back to us next tick...
333             debug "can't send yet"
334             @timer.start
335             return
336           end
337           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
338           while !@sendq.empty? and @burst < @sendq_burst and now > @last_send - MAX_IRC_SEND_PENALTY
339             mess = @sendq.next
340             puts_critical(@sendq.shift)
341             @last_send += mess.irc_send_penalty
342           end
343           if @sendq.empty?
344             @timer.stop
345           end
346         rescue => e
347           error "Spooling failed: #{e.inspect}"
348           error e.backtrace.join("\n")
349         end
350       end
351     end
352
353     def clearq
354       if @sock
355         @qmutex.synchronize do
356           unless @sendq.empty?
357             @sendq.clear
358           end
359         end
360       else
361         warning "Clearing socket while disconnected"
362       end
363     end
364
365     # flush the TCPSocket
366     def flush
367       @sock.flush
368     end
369
370     # Wraps Kernel.select on the socket
371     def select(timeout=nil)
372       Kernel.select([@sock], nil, nil, timeout)
373     end
374
375     # shutdown the connection to the server
376     def shutdown(how=2)
377       @sock.shutdown(how) unless @sock.nil?
378       @sock = nil
379       @burst = 0
380     end
381
382     private
383
384     # same as puts, but expects to be called with a mutex held on @qmutex
385     def puts_critical(message)
386       # debug "in puts_critical"
387       begin
388         debug "SEND: #{message.inspect}"
389         if @sock.nil?
390           error "SEND attempted on closed socket"
391         else
392           @sock.send(message + "\n",0)
393           @last_send = Time.new
394           @lines_sent += 1
395           @burst += 1
396         end
397       rescue => e
398         error "SEND failed: #{e.inspect}"
399         raise
400       end
401     end
402
403   end
404
405 end