7 # wrapped TCPSocket for communication with the server.
8 # emulates a subset of TCPSocket functionality
10 # total number of lines sent to the irc server
11 attr_reader :lines_sent
13 # total number of lines received from the irc server
14 attr_reader :lines_received
16 # total number of bytes sent to the irc server
17 attr_reader :bytes_sent
19 # total number of bytes received from the irc server
20 attr_reader :bytes_received
22 # accumulator for the throttle
23 attr_reader :throttle_bytes
26 attr_reader :bytes_per
27 attr_reader :seconds_per
29 # delay between lines sent
30 attr_reader :sendq_delay
33 attr_reader :sendq_burst
35 # server:: server to connect to
37 # host:: optional local host to bind to (ruby 1.7+ required)
38 # create a new IrcSocket
39 def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
40 @timer = Timer::Timer.new
52 @sendq_delay = sendq_delay.to_f
56 @last_send = Time.new - @sendq_delay
57 @last_throttle = Time.new
60 @sendq_burst = sendq_burst.to_i
72 if brt.match(/(\d+)\/(\d)/)
74 @seconds_per = $2.to_i
75 debug "Byterate now #{byterate}"
78 debug "Couldn't set byterate #{brt}"
87 # open a TCP connection to the server
90 debug "reconnecting socket while connected"
95 @sock=TCPSocket.new(@server, @port, @host)
96 rescue ArgumentError => e
97 error "Your version of ruby does not support binding to a "
98 error "specific local address, please upgrade if you wish "
99 error "to use HOST = foo"
100 error "(this option has been disabled in order to continue)"
101 @sock=TCPSocket.new(@server, @port)
104 @sock=TCPSocket.new(@server, @port)
111 def sendq_delay=(newfreq)
112 debug "changing sendq frequency to #{newfreq}"
113 @qmutex.synchronize do
114 @sendq_delay = newfreq
124 def sendq_burst=(newburst)
125 @qmutex.synchronize do
126 @sendq_burst = newburst
131 return "#{@bytes_per}/#{@seconds_per}"
134 def byterate=(newrate)
135 @qmutex.synchronize do
140 def run_throttle(more=0)
142 if @throttle_bytes > 0
143 # If we ever reach the limit, we halve the actual allowed byterate
144 # until we manage to reset the throttle.
145 # I don't know if this is the best way, though, because the real
146 # problem is probably non-queued messages like PINGs and PONGs.
147 # A better solution would probably be to have two queues,
148 # one for priority messages and another one for normal messages.
149 # Even better, we should have:
150 # * one queue for server stuff
151 # * one for each channel
152 # * one for each private communication
153 # The server queue would have priority, everything else would be served
154 # round-robin, so that someone making the bot flood one channel wouldn't
155 # prevent the bot from working on other channels (or private communications)
156 if @throttle_bytes >= @bytes_per
159 delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
161 @throttle_bytes -= delta
162 @throttle_bytes = 0 if @throttle_bytes < 0
168 @throttle_bytes += more
171 # used to send lines to the remote IRCd
172 # message: IRC message to send
174 @qmutex.synchronize do
175 # debug "In puts - got mutex"
176 puts_critical(message)
180 # get the next line from the server (blocks)
183 debug "socket get attempted while closed"
189 reply.strip! if reply
190 debug "RECV: #{reply.inspect}"
193 debug "socket get failed: #{e.inspect}"
200 @qmutex.synchronize do
205 # just send it if queueing is disabled
210 # pop a message off the queue, send it
217 if (now >= (@last_send + @sendq_delay))
218 # reset burst counter after @sendq_delay has passed
220 debug "in spool, resetting @burst"
221 elsif (@burst >= @sendq_burst)
222 # nope. can't send anything, come back to us next tick...
226 @qmutex.synchronize do
227 debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
228 (@sendq_burst - @burst).times do
229 break if @sendq.empty?
231 if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
232 debug "(flood protection: sending message of length #{mess.length})"
233 debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
234 puts_critical(@sendq.shift)
236 debug "(flood protection: throttling message of length #{mess.length})"
237 debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
251 @qmutex.synchronize do
256 debug "Clearing socket while disconnected"
260 # flush the TCPSocket
265 # Wraps Kernel.select on the socket
266 def select(timeout=nil)
267 Kernel.select([@sock], nil, nil, timeout)
270 # shutdown the connection to the server
272 @sock.shutdown(how) unless @sock.nil?
278 # same as puts, but expects to be called with a mutex held on @qmutex
279 def puts_critical(message)
280 # debug "in puts_critical"
281 debug "SEND: #{message.inspect}"
282 @sock.send(message + "\n",0)
283 @last_send = Time.new
286 run_throttle(message.length + 1)