]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
4ee3be236e59637d5de04e399a1eb487b7d57c4e
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   # wrapped TCPSocket for communication with the server.
8   # emulates a subset of TCPSocket functionality
9   class IrcSocket
10     # total number of lines sent to the irc server
11     attr_reader :lines_sent
12
13     # total number of lines received from the irc server
14     attr_reader :lines_received
15
16     # total number of bytes sent to the irc server
17     attr_reader :bytes_sent
18
19     # total number of bytes received from the irc server
20     attr_reader :bytes_received
21
22     # accumulator for the throttle
23     attr_reader :throttle_bytes
24
25     # byterate components
26     attr_reader :bytes_per
27     attr_reader :seconds_per
28
29     # delay between lines sent
30     attr_reader :sendq_delay
31
32     # max lines to burst
33     attr_reader :sendq_burst
34
35     # server:: server to connect to
36     # port::   IRCd port
37     # host::   optional local host to bind to (ruby 1.7+ required)
38     # create a new IrcSocket
39     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
40       @timer = Timer::Timer.new
41       @timer.add(0.2) do
42         spool
43       end
44       @server = server.dup
45       @port = port.to_i
46       @host = host
47       @sock = nil
48       @spooler = false
49       @lines_sent = 0
50       @lines_received = 0
51       if sendq_delay
52         @sendq_delay = sendq_delay.to_f
53       else
54         @sendq_delay = 2
55       end
56       @last_send = Time.new - @sendq_delay
57       @last_throttle = Time.new
58       @burst = 0
59       if sendq_burst
60         @sendq_burst = sendq_burst.to_i
61       else
62         @sendq_burst = 4
63       end
64       @bytes_per = 400
65       @seconds_per = 2
66       @throttle_bytes = 0
67       @throttle_div = 1
68       setbyterate(brt)
69     end
70
71     def setbyterate(brt)
72       if brt.match(/(\d+)\/(\d)/)
73         @bytes_per = $1.to_i
74         @seconds_per = $2.to_i
75         debug "Byterate now #{byterate}"
76         return true
77       else
78         debug "Couldn't set byterate #{brt}"
79         return false
80       end
81     end
82
83     def connected?
84       !@sock.nil?
85     end
86
87     # open a TCP connection to the server
88     def connect
89       if connected?
90         debug "reconnecting socket while connected"
91         shutdown
92       end
93       if(@host)
94         begin
95           @sock=TCPSocket.new(@server, @port, @host)
96         rescue ArgumentError => e
97           error "Your version of ruby does not support binding to a "
98           error "specific local address, please upgrade if you wish "
99           error "to use HOST = foo"
100           error "(this option has been disabled in order to continue)"
101           @sock=TCPSocket.new(@server, @port)
102         end
103       else
104         @sock=TCPSocket.new(@server, @port)
105       end
106       @qthread = false
107       @qmutex = Mutex.new
108       @sendq = Array.new
109     end
110
111     def sendq_delay=(newfreq)
112       debug "changing sendq frequency to #{newfreq}"
113       @qmutex.synchronize do
114         @sendq_delay = newfreq
115         if newfreq == 0
116           clearq
117           @timer.stop
118         else
119           @timer.start
120         end
121       end
122     end
123
124     def sendq_burst=(newburst)
125       @qmutex.synchronize do
126         @sendq_burst = newburst
127       end
128     end
129
130     def byterate
131       return "#{@bytes_per}/#{@seconds_per}"
132     end
133
134     def byterate=(newrate)
135       @qmutex.synchronize do
136         setbyterate(newrate)
137       end
138     end
139
140     def run_throttle(more=0)
141       now = Time.new
142       if @throttle_bytes > 0
143         # If we ever reach the limit, we halve the actual allowed byterate
144         # until we manage to reset the throttle.
145         # I don't know if this is the best way, though, because the real
146         # problem is probably non-queued messages like PINGs and PONGs.
147         # A better solution would probably be to have two queues,
148         # one for priority messages and another one for normal messages.
149         # Even better, we should have:
150         # * one queue for server stuff
151         # * one for each channel
152         # * one for each private communication
153         # The server queue would have priority, everything else would be served
154         # round-robin, so that someone making the bot flood one channel wouldn't
155         # prevent the bot from working on other channels (or private communications)
156         if @throttle_bytes >= @bytes_per
157           @throttle_div = 0.5
158         end
159         delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
160         if delta > 0
161           @throttle_bytes -= delta
162           @throttle_bytes = 0 if @throttle_bytes < 0
163           @last_throttle = now
164         end
165       else
166         @throttle_div = 1
167       end
168       @throttle_bytes += more
169     end
170
171     # used to send lines to the remote IRCd
172     # message: IRC message to send
173     def puts(message)
174       @qmutex.synchronize do
175         # debug "In puts - got mutex"
176         puts_critical(message)
177       end
178     end
179
180     # get the next line from the server (blocks)
181     def gets
182       if @sock.nil?
183         debug "socket get attempted while closed"
184         return nil
185       end
186       begin
187         reply = @sock.gets
188         @lines_received += 1
189         reply.strip! if reply
190         debug "RECV: #{reply.inspect}"
191         return reply
192       rescue => e
193         debug "socket get failed: #{e.inspect}"
194         return nil
195       end
196     end
197
198     def queue(msg)
199       if @sendq_delay > 0
200         @qmutex.synchronize do
201           @sendq.push msg
202         end
203         @timer.start
204       else
205         # just send it if queueing is disabled
206         self.puts(msg)
207       end
208     end
209
210     # pop a message off the queue, send it
211     def spool
212       if @sendq.empty?
213         @timer.stop
214         return
215       end
216       now = Time.new
217       if (now >= (@last_send + @sendq_delay))
218         # reset burst counter after @sendq_delay has passed
219         @burst = 0
220         debug "in spool, resetting @burst"
221       elsif (@burst >= @sendq_burst)
222         # nope. can't send anything, come back to us next tick...
223         @timer.start
224         return
225       end
226       @qmutex.synchronize do
227         debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
228         (@sendq_burst - @burst).times do
229           break if @sendq.empty?
230           mess = @sendq[0]
231           if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
232             debug "(flood protection: sending message of length #{mess.length})"
233             debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
234             puts_critical(@sendq.shift)
235           else
236             debug "(flood protection: throttling message of length #{mess.length})"
237             debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
238             run_throttle
239             break
240           end
241         end
242       end
243       if @sendq.empty?
244         @timer.stop
245       end
246     end
247
248     def clearq
249       if @sock
250         unless @sendq.empty?
251           @qmutex.synchronize do
252             @sendq.clear
253           end
254         end
255       else
256         debug "Clearing socket while disconnected"
257       end
258     end
259
260     # flush the TCPSocket
261     def flush
262       @sock.flush
263     end
264
265     # Wraps Kernel.select on the socket
266     def select(timeout=nil)
267       Kernel.select([@sock], nil, nil, timeout)
268     end
269
270     # shutdown the connection to the server
271     def shutdown(how=2)
272       @sock.shutdown(how) unless @sock.nil?
273       @sock = nil
274     end
275
276     private
277
278     # same as puts, but expects to be called with a mutex held on @qmutex
279     def puts_critical(message)
280       # debug "in puts_critical"
281       debug "SEND: #{message.inspect}"
282       @sock.send(message + "\n",0)
283       @last_send = Time.new
284       @lines_sent += 1
285       @burst += 1
286       run_throttle(message.length + 1)
287     end
288
289   end
290
291 end