2 # Calculate the penalty which will be assigned to this message
5 # According to eggrdop, the initial penalty is
6 penalty = 1 + self.size/100
7 # on everything but UnderNET where it's
8 # penalty = 2 + self.size/120
10 cmd, pars = self.split($;,2)
11 debug "cmd: #{cmd}, pars: #{pars.inspect}"
14 chan, nick, msg = pars.split
15 chan = chan.split(',')
16 nick = nick.split(',')
20 chan, modes, argument = pars.split
25 extra += modes.split(/\+|-/).size
27 extra += 3 * modes.split(/\+|-/).size
31 extra += 2 * argument.split.size
33 penalty += extra * chan.split.size
36 penalty += 2 unless pars.split.size < 2
37 when :PRIVMSG, :NOTICE
38 dests = pars.split($;,2).first
39 penalty += dests.split(',').size
41 # I'm too lazy to implement this one correctly
43 when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
49 else # Unknown messages
53 debug "Wow, more than 99 secs of penalty!"
57 debug "Wow, less than 2 secs of penalty!"
60 debug "penalty: #{penalty}"
72 # A QueueRing is implemented as an array with elements in the form
73 # [chan, [message1, message2, ...]
74 # Note that the channel +chan+ has no actual bearing with the channels
75 # to which messages will be sent
101 cmess = @storage.assoc(chan)
103 idx = @storage.index(cmess)
105 @storage[idx] = cmess
107 @storage << [chan, [mess]]
113 warning "trying to access empty ring"
117 @last_idx = (@last_idx + 1) % @storage.size
118 mess = @storage[@last_idx][1].first
125 warning "trying to access empty ring"
128 @last_idx = (@last_idx + 1) % @storage.size
129 mess = @storage[@last_idx][1].shift
130 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
138 # a MessageQueue is an array of QueueRings
139 # rings have decreasing priority, so messages in ring 0
140 # are more important than messages in ring 1, and so on
141 @rings = Array.new(3) { |i|
145 # ring 0 is special in that if it's not empty, it will
146 # be popped. IOW, ring 0 can starve the other rings
147 # ring 0 is strictly FIFO and is therefore implemented
152 # the other rings are satisfied round-robin
163 def push(mess, chan=nil, cring=0)
166 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
169 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
170 @rings[ring].push mess, chan
176 return false unless r.empty?
192 warning "trying to access empty ring"
197 mess = @rings[0].first
199 save_ring = @last_ring
200 (@rings.size - 1).times {
201 @last_ring = (@last_ring % (@rings.size - 1)) + 1
202 if !@rings[@last_ring].empty?
203 mess = @rings[@last_ring].next
207 @last_ring = save_ring
209 error "nil message" if mess.nil?
215 warning "trying to access empty ring"
220 return @rings[0].shift
222 (@rings.size - 1).times {
223 @last_ring = (@last_ring % (@rings.size - 1)) + 1
224 if !@rings[@last_ring].empty?
225 return @rings[@last_ring].shift
228 error "nil message" if mess.nil?
234 # wrapped TCPSocket for communication with the server.
235 # emulates a subset of TCPSocket functionality
238 MAX_IRC_SEND_PENALTY = 10
240 # total number of lines sent to the irc server
241 attr_reader :lines_sent
243 # total number of lines received from the irc server
244 attr_reader :lines_received
246 # total number of bytes sent to the irc server
247 attr_reader :bytes_sent
249 # total number of bytes received from the irc server
250 attr_reader :bytes_received
252 # accumulator for the throttle
253 attr_reader :throttle_bytes
255 # delay between lines sent
256 attr_reader :sendq_delay
259 attr_reader :sendq_burst
261 # an optional filter object. we call @filter.in(data) for
262 # all incoming data and @filter.out(data) for all outgoing data
265 # normalized uri of the current server
266 attr_reader :server_uri
268 # default trivial filter class
279 # set filter to identity, not to nil
281 @filter = f || IdentityFilter.new
284 # server_list:: list of servers to connect to
285 # host:: optional local host to bind to (ruby 1.7+ required)
286 # create a new IrcSocket
287 def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
289 @act_id = @timer.add(0.2, :blocked => true) { spool }
290 @server_list = server_list.dup
295 @filter = IdentityFilter.new
299 if opts.kind_of?(Hash) and opts.key?(:ssl)
306 @sendq_delay = sendq_delay.to_f
310 @last_send = Time.new - @sendq_delay
311 @flood_send = Time.new
312 @last_throttle = Time.new
315 @sendq_burst = sendq_burst.to_i
325 # open a TCP connection to the server
328 warning "reconnecting while connected"
331 srv_uri = @server_list[@conn_count % @server_list.size].dup
332 srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
334 @server_uri = URI.parse(srv_uri)
335 @server_uri.port = 6667 if !@server_uri.port
336 debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
340 @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
341 rescue ArgumentError => e
342 error "Your version of ruby does not support binding to a "
343 error "specific local address, please upgrade if you wish "
344 error "to use HOST = foo"
345 error "(this option has been disabled in order to continue)"
346 @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
349 @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
353 ssl_context = OpenSSL::SSL::SSLContext.new()
354 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
356 @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
357 @sock.sync_close = true
362 @sendq = MessageQueue.new
365 def sendq_delay=(newfreq)
366 debug "changing sendq frequency to #{newfreq}"
367 @qmutex.synchronize do
368 @sendq_delay = newfreq
371 @timer.block(@act_id)
373 @timer.unblock(@act_id)
378 def sendq_burst=(newburst)
379 @qmutex.synchronize do
380 @sendq_burst = newburst
384 # used to send lines to the remote IRCd by skipping the queue
385 # message: IRC message to send
386 # it should only be used for stuff that *must not* be queued,
387 # i.e. the initial PASS, NICK and USER command
388 # or the final QUIT message
389 def emergency_puts(message)
390 @qmutex.synchronize do
391 # debug "In puts - got mutex"
392 puts_critical(message)
396 def handle_socket_error(string, e)
397 error "#{string} failed: #{e.pretty_inspect}"
398 # We assume that an error means that there are connection
399 # problems and that we should reconnect, so we
401 raise SocketError.new(e.inspect)
404 # get the next line from the server (blocks)
407 warning "socket get attempted while closed"
411 reply = @filter.in(@sock.gets)
413 reply.strip! if reply
414 debug "RECV: #{reply.inspect}"
416 rescue Exception => e
417 handle_socket_error(:RECV, e)
421 def queue(msg, chan=nil, ring=0)
423 @qmutex.synchronize do
424 @sendq.push msg, chan, ring
425 @timer.unblock(@act_id)
428 # just send it if queueing is disabled
429 self.emergency_puts(msg)
433 # pop a message off the queue, send it
435 @qmutex.synchronize do
439 @timer.block(@act_id)
443 if (now >= (@last_send + @sendq_delay))
444 debug "resetting @burst"
446 elsif (@burst > @sendq_burst)
447 # nope. can't send anything, come back to us next tick...
448 debug "can't send yet"
449 @timer.unblock(@act_id)
452 @flood_send = now if @flood_send < now
453 debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
454 while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
455 debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
456 puts_critical(@sendq.shift, true)
459 @timer.block(@act_id)
461 rescue Exception => e
462 error "Spooling failed: #{e.pretty_inspect}"
470 @qmutex.synchronize do
476 warning "Clearing socket while disconnected"
480 # flush the TCPSocket
485 # Wraps Kernel.select on the socket
486 def select(timeout=nil)
487 Kernel.select([@sock], nil, nil, timeout)
490 # shutdown the connection to the server
492 return unless connected?
495 rescue Exception => e
496 error "error while shutting down: #{e.pretty_inspect}"
498 @rawsock = nil if @ssl
506 # same as puts, but expects to be called with a mutex held on @qmutex
507 def puts_critical(message, penalty=false)
508 # debug "in puts_critical"
510 debug "SEND: #{message.inspect}"
512 error "SEND attempted on closed socket"
514 @sock.puts(@filter.out(message))
515 @last_send = Time.new
516 @flood_send += message.irc_send_penalty if penalty
520 rescue Exception => e
521 handle_socket_error(:SEND, e)