2 # Calculate the penalty which will be assigned to this message
5 # According to eggrdop, the initial penalty is
6 penalty = 1 + self.length/100
7 # on everything but UnderNET where it's
8 # penalty = 2 + self.length/120
10 cmd, pars = self.split($;,2)
11 debug "cmd: #{cmd}, pars: #{pars.inspect}"
14 chan, nick, msg = pars.split
15 chan = chan.split(',')
16 nick = nick.split(',')
17 penalty += nick.length
18 penalty *= chan.length
20 chan, modes, argument = pars.split
25 extra += modes.split(/\+|-/).length
27 extra += 3 * modes.split(/\+|-/).length
31 extra += 2 * argument.split.length
33 penalty += extra * chan.split.length
36 penalty += 2 unless pars.split.length < 2
37 when :PRIVMSG, :NOTICE
38 dests = pars.split($;,2).first
39 penalty += dests.split(',').length
41 # I'm too lazy to implement this one correctly
43 when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
49 else # Unknown messages
53 debug "Wow, more than 99 secs of penalty!"
57 debug "Wow, less than 2 secs of penalty!"
60 debug "penalty: #{penalty}"
72 # A QueueRing is implemented as an array with elements in the form
73 # [chan, [message1, message2, ...]
74 # Note that the channel +chan+ has no actual bearing with the channels
75 # to which messages will be sent
100 cmess = @storage.assoc(chan)
102 idx = @storage.index(cmess)
104 @storage[idx] = cmess
106 @storage << [chan, [mess]]
112 warning "trying to access empty ring"
116 @last_idx = (@last_idx + 1) % @storage.length
117 mess = @storage[@last_idx][1].first
124 warning "trying to access empty ring"
127 @last_idx = (@last_idx + 1) % @storage.length
128 mess = @storage[@last_idx][1].shift
129 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
137 # a MessageQueue is an array of QueueRings
138 # rings have decreasing priority, so messages in ring 0
139 # are more important than messages in ring 1, and so on
140 @rings = Array.new(3) { |i|
144 # ring 0 is special in that if it's not empty, it will
145 # be popped. IOW, ring 0 can starve the other rings
146 # ring 0 is strictly FIFO and is therefore implemented
151 # the other rings are satisfied round-robin
162 def push(mess, chan=nil, cring=0)
165 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
168 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
169 @rings[ring].push mess, chan
175 return false unless r.empty?
190 warning "trying to access empty ring"
195 mess = @rings[0].first
197 save_ring = @last_ring
198 (@rings.length - 1).times {
199 @last_ring = (@last_ring % (@rings.length - 1)) + 1
200 if !@rings[@last_ring].empty?
201 mess = @rings[@last_ring].next
205 @last_ring = save_ring
207 error "nil message" if mess.nil?
213 warning "trying to access empty ring"
218 return @rings[0].shift
220 (@rings.length - 1).times {
221 @last_ring = (@last_ring % (@rings.length - 1)) + 1
222 if !@rings[@last_ring].empty?
223 return @rings[@last_ring].shift
226 error "nil message" if mess.nil?
232 # wrapped TCPSocket for communication with the server.
233 # emulates a subset of TCPSocket functionality
236 MAX_IRC_SEND_PENALTY = 10
238 # total number of lines sent to the irc server
239 attr_reader :lines_sent
241 # total number of lines received from the irc server
242 attr_reader :lines_received
244 # total number of bytes sent to the irc server
245 attr_reader :bytes_sent
247 # total number of bytes received from the irc server
248 attr_reader :bytes_received
250 # accumulator for the throttle
251 attr_reader :throttle_bytes
253 # delay between lines sent
254 attr_reader :sendq_delay
257 attr_reader :sendq_burst
259 # server:: server to connect to
261 # host:: optional local host to bind to (ruby 1.7+ required)
262 # create a new IrcSocket
263 def initialize(server, port, host, sendq_delay=2, sendq_burst=4, opts={})
264 @timer = Timer::Timer.new
275 if opts.kind_of?(Hash) and opts.key?(:ssl)
282 @sendq_delay = sendq_delay.to_f
286 @last_send = Time.new - @sendq_delay
287 @flood_send = Time.new
288 @last_throttle = Time.new
291 @sendq_burst = sendq_burst.to_i
301 # open a TCP connection to the server
304 warning "reconnecting while connected"
309 @sock=TCPSocket.new(@server, @port, @host)
310 rescue ArgumentError => e
311 error "Your version of ruby does not support binding to a "
312 error "specific local address, please upgrade if you wish "
313 error "to use HOST = foo"
314 error "(this option has been disabled in order to continue)"
315 @sock=TCPSocket.new(@server, @port)
318 @sock=TCPSocket.new(@server, @port)
322 ssl_context = OpenSSL::SSL::SSLContext.new()
323 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
325 @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
326 @sock.sync_close = true
331 @sendq = MessageQueue.new
334 def sendq_delay=(newfreq)
335 debug "changing sendq frequency to #{newfreq}"
336 @qmutex.synchronize do
337 @sendq_delay = newfreq
347 def sendq_burst=(newburst)
348 @qmutex.synchronize do
349 @sendq_burst = newburst
353 # used to send lines to the remote IRCd by skipping the queue
354 # message: IRC message to send
355 # it should only be used for stuff that *must not* be queued,
356 # i.e. the initial PASS, NICK and USER command
357 # or the final QUIT message
358 def emergency_puts(message)
359 @qmutex.synchronize do
360 # debug "In puts - got mutex"
361 puts_critical(message)
365 def handle_socket_error(string, err)
366 error "#{string} failed: #{err.inspect}"
367 debug err.backtrace.join("\n")
368 # We assume that an error means that there are connection
369 # problems and that we should reconnect, so we
371 raise SocketError.new(err.inspect)
374 # get the next line from the server (blocks)
377 warning "socket get attempted while closed"
383 reply.strip! if reply
384 debug "RECV: #{reply.inspect}"
387 handle_socket_error(:RECV, e)
391 def queue(msg, chan=nil, ring=0)
393 @qmutex.synchronize do
394 @sendq.push msg, chan, ring
398 # just send it if queueing is disabled
399 self.emergency_puts(msg)
403 # pop a message off the queue, send it
405 @qmutex.synchronize do
413 if (now >= (@last_send + @sendq_delay))
414 debug "resetting @burst"
416 elsif (@burst > @sendq_burst)
417 # nope. can't send anything, come back to us next tick...
418 debug "can't send yet"
422 @flood_send = now if @flood_send < now
423 debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
424 while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
425 debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
426 puts_critical(@sendq.shift, true)
432 error "Spooling failed: #{e.inspect}"
433 error e.backtrace.join("\n")
440 @qmutex.synchronize do
446 warning "Clearing socket while disconnected"
450 # flush the TCPSocket
455 # Wraps Kernel.select on the socket
456 def select(timeout=nil)
457 Kernel.select([@sock], nil, nil, timeout)
460 # shutdown the connection to the server
462 return unless connected?
466 error "error while shutting down: #{err.inspect}"
467 debug err.backtrace.join("\n")
469 @rawsock = nil if @ssl
476 # same as puts, but expects to be called with a mutex held on @qmutex
477 def puts_critical(message, penalty=false)
478 # debug "in puts_critical"
480 debug "SEND: #{message.inspect}"
482 error "SEND attempted on closed socket"
485 @last_send = Time.new
486 @flood_send += message.irc_send_penalty if penalty
491 handle_socket_error(:SEND, e)