]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
Rename the former {{{log}}} method to {{{irclog}}} and introduce new logging function...
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   # wrapped TCPSocket for communication with the server.
8   # emulates a subset of TCPSocket functionality
9   class IrcSocket
10     # total number of lines sent to the irc server
11     attr_reader :lines_sent
12
13     # total number of lines received from the irc server
14     attr_reader :lines_received
15
16     # total number of bytes sent to the irc server
17     attr_reader :bytes_sent
18
19     # total number of bytes received from the irc server
20     attr_reader :bytes_received
21
22     # accumulator for the throttle
23     attr_reader :throttle_bytes
24
25     # byterate components
26     attr_reader :bytes_per
27     attr_reader :seconds_per
28
29     # delay between lines sent
30     attr_reader :sendq_delay
31
32     # max lines to burst
33     attr_reader :sendq_burst
34
35     # server:: server to connect to
36     # port::   IRCd port
37     # host::   optional local host to bind to (ruby 1.7+ required)
38     # create a new IrcSocket
39     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
40       @timer = Timer::Timer.new
41       @timer.add(0.2) do
42         spool
43       end
44       @server = server.dup
45       @port = port.to_i
46       @host = host
47       @sock = nil
48       @spooler = false
49       @lines_sent = 0
50       @lines_received = 0
51       if sendq_delay
52         @sendq_delay = sendq_delay.to_f
53       else
54         @sendq_delay = 2
55       end
56       @last_send = Time.new - @sendq_delay
57       @last_throttle = Time.new
58       @burst = 0
59       if sendq_burst
60         @sendq_burst = sendq_burst.to_i
61       else
62         @sendq_burst = 4
63       end
64       @bytes_per = 400
65       @seconds_per = 2
66       @throttle_bytes = 0
67       setbyterate(brt)
68     end
69
70     def setbyterate(brt)
71       if brt.match(/(\d+)\/(\d)/)
72         @bytes_per = $1.to_i
73         @seconds_per = $2.to_i
74         debug "Byterate now #{byterate}"
75         return true
76       else
77         debug "Couldn't set byterate #{brt}"
78         return false
79       end
80     end
81
82     def connected?
83       !@sock.nil?
84     end
85
86     # open a TCP connection to the server
87     def connect
88       if connected?
89         debug "reconnecting socket while connected"
90         shutdown
91       end
92       if(@host)
93         begin
94           @sock=TCPSocket.new(@server, @port, @host)
95         rescue ArgumentError => e
96           $stderr.puts "Your version of ruby does not support binding to a "
97           $stderr.puts "specific local address, please upgrade if you wish "
98           $stderr.puts "to use HOST = foo"
99           $stderr.puts "(this option has been disabled in order to continue)"
100           @sock=TCPSocket.new(@server, @port)
101         end
102       else
103         @sock=TCPSocket.new(@server, @port)
104       end
105       @qthread = false
106       @qmutex = Mutex.new
107       @sendq = Array.new
108     end
109
110     def sendq_delay=(newfreq)
111       debug "changing sendq frequency to #{newfreq}"
112       @qmutex.synchronize do
113         @sendq_delay = newfreq
114         if newfreq == 0
115           clearq
116           @timer.stop
117         else
118           @timer.start
119         end
120       end
121     end
122
123     def sendq_burst=(newburst)
124       @qmutex.synchronize do
125         @sendq_burst = newburst
126       end
127     end
128
129     def byterate
130       return "#{@bytes_per}/#{@seconds_per}"
131     end
132
133     def byterate=(newrate)
134       @qmutex.synchronize do
135         setbyterate(newrate)
136       end
137     end
138
139     # used to send lines to the remote IRCd
140     # message: IRC message to send
141     def puts(message)
142       @qmutex.synchronize do
143         # debug "In puts - got mutex"
144         puts_critical(message)
145       end
146     end
147
148     # get the next line from the server (blocks)
149     def gets
150       if @sock.nil?
151         debug "socket get attempted while closed"
152         return nil
153       end
154       begin
155         reply = @sock.gets
156         @lines_received += 1
157         reply.strip! if reply
158         debug "RECV: #{reply.inspect}"
159         return reply
160       rescue => e
161         debug "socket get failed: #{e.inspect}"
162         return nil
163       end
164     end
165
166     def queue(msg)
167       if @sendq_delay > 0
168         @qmutex.synchronize do
169           @sendq.push msg
170         end
171         @timer.start
172       else
173         # just send it if queueing is disabled
174         self.puts(msg)
175       end
176     end
177
178     # pop a message off the queue, send it
179     def spool
180       if @sendq.empty?
181         @timer.stop
182         return
183       end
184       now = Time.new
185       if @throttle_bytes > 0
186         delta = ((now - @last_throttle)*@bytes_per/@seconds_per).floor
187         if delta > 0
188           @throttle_bytes -= delta
189           @throttle_bytes = 0 if @throttle_bytes < 0
190           @last_throttle = now
191         end
192       end
193       if (now >= (@last_send + @sendq_delay))
194         # reset burst counter after @sendq_delay has passed
195         @burst = 0
196         debug "in spool, resetting @burst"
197       elsif (@burst >= @sendq_burst)
198         # nope. can't send anything, come back to us next tick...
199         @timer.start
200         return
201       end
202       @qmutex.synchronize do
203         debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
204         (@sendq_burst - @burst).times do
205           break if @sendq.empty?
206           mess = @sendq[0]
207           if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
208             puts_critical(@sendq.shift)
209           else
210             debug "(flood protection: throttling message of length #{mess.length})"
211             debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
212             break
213           end
214         end
215       end
216       if @sendq.empty?
217         @timer.stop
218       end
219     end
220
221     def clearq
222       if @sock
223         unless @sendq.empty?
224           @qmutex.synchronize do
225             @sendq.clear
226           end
227         end
228       else
229         debug "Clearing socket while disconnected"
230       end
231     end
232
233     # flush the TCPSocket
234     def flush
235       @sock.flush
236     end
237
238     # Wraps Kernel.select on the socket
239     def select(timeout=nil)
240       Kernel.select([@sock], nil, nil, timeout)
241     end
242
243     # shutdown the connection to the server
244     def shutdown(how=2)
245       @sock.shutdown(how) unless @sock.nil?
246       @sock = nil
247     end
248
249     private
250
251     # same as puts, but expects to be called with a mutex held on @qmutex
252     def puts_critical(message)
253       # debug "in puts_critical"
254       debug "SEND: #{message.inspect}"
255       @sock.send(message + "\n",0)
256       @last_send = Time.new
257       @lines_sent += 1
258       @burst += 1
259       @throttle_bytes += message.length + 1
260       @last_throttle = Time.new
261     end
262
263   end
264
265 end