X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=lib%2Frbot%2Fircsocket.rb;h=2e6ff452d7f47a0685e3e7655f57cb5d8db940ae;hb=2ecf2f58c843895ce4ad143d0a05283c4b7e37e8;hp=c1bc36115c28af7b8347e02c1c81b109bf3fdbf9;hpb=696efbfb6ad3f40c313c17b22041e12c3196a7e9;p=user%2Fhenk%2Fcode%2Fruby%2Frbot.git diff --git a/lib/rbot/ircsocket.rb b/lib/rbot/ircsocket.rb index c1bc3611..2e6ff452 100644 --- a/lib/rbot/ircsocket.rb +++ b/lib/rbot/ircsocket.rb @@ -1,3 +1,5 @@ +require 'monitor' + class ::String # Calculate the penalty which will be assigned to this message # by the IRCd @@ -66,7 +68,6 @@ module Irc require 'socket' require 'thread' - require 'rbot/timer' class QueueRing # A QueueRing is implemented as an array with elements in the form @@ -134,6 +135,7 @@ module Irc end class MessageQueue + def initialize # a MessageQueue is an array of QueueRings # rings have decreasing priority, so messages in ring 0 @@ -151,89 +153,66 @@ module Irc } # the other rings are satisfied round-robin @last_ring = 0 + self.extend(MonitorMixin) + @non_empty = self.new_cond end def clear - @rings.each { |r| - r.clear - } - @last_ring = 0 + self.synchronize do + @rings.each { |r| r.clear } + @last_ring = 0 + end end def push(mess, chan=nil, cring=0) ring = cring - if ring == 0 - warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil? - @rings[0] << mess - else - error "message #{mess} at ring #{ring} must have a channel" if chan.nil? - @rings[ring].push mess, chan + self.synchronize do + if ring == 0 + warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil? + @rings[0] << mess + else + error "message #{mess} at ring #{ring} must have a channel" if chan.nil? + @rings[ring].push mess, chan + end + @non_empty.signal + end + end + + def shift(tmout = nil) + self.synchronize do + @non_empty.wait(tmout) if self.empty? + return unsafe_shift end end + protected + def empty? - @rings.each { |r| - return false unless r.empty? - } - return true + !@rings.find { |r| !r.empty? } end def length - len = 0 - @rings.each { |r| - len += r.size - } - len + @rings.inject(0) { |s, r| s + r.size } end alias :size :length - def next - if empty? - warning "trying to access empty ring" - return nil - end - mess = nil - if !@rings[0].empty? - mess = @rings[0].first - else - save_ring = @last_ring - (@rings.size - 1).times { - @last_ring = (@last_ring % (@rings.size - 1)) + 1 - if !@rings[@last_ring].empty? - mess = @rings[@last_ring].next - break - end - } - @last_ring = save_ring - end - error "nil message" if mess.nil? - return mess - end - - def shift - if empty? - warning "trying to access empty ring" - return nil - end - mess = nil + def unsafe_shift if !@rings[0].empty? return @rings[0].shift end - (@rings.size - 1).times { + (@rings.size - 1).times do @last_ring = (@last_ring % (@rings.size - 1)) + 1 - if !@rings[@last_ring].empty? - return @rings[@last_ring].shift - end - } - error "nil message" if mess.nil? - return mess + return @rings[@last_ring].shift unless @rings[@last_ring].empty? + end + warning "trying to access an empty message queue" + return nil end end # wrapped TCPSocket for communication with the server. # emulates a subset of TCPSocket functionality - class IrcSocket + class Socket MAX_IRC_SEND_PENALTY = 10 @@ -253,10 +232,10 @@ module Irc attr_reader :throttle_bytes # delay between lines sent - attr_reader :sendq_delay + attr_accessor :sendq_delay # max lines to burst - attr_reader :sendq_burst + attr_accessor :sendq_burst # an optional filter object. we call @filter.in(data) for # all incoming data and @filter.out(data) for all outgoing data @@ -283,12 +262,8 @@ module Irc # server_list:: list of servers to connect to # host:: optional local host to bind to (ruby 1.7+ required) - # create a new IrcSocket + # create a new Irc::Socket def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={}) - @timer = Timer::Timer.new - @timer.add(0.2) do - spool - end @server_list = server_list.dup @server_uri = nil @conn_count = 0 @@ -309,10 +284,6 @@ module Irc else @sendq_delay = 2 end - @last_send = Time.new - @sendq_delay - @flood_send = Time.new - @last_throttle = Time.new - @burst = 0 if sendq_burst @sendq_burst = sendq_burst.to_i else @@ -339,48 +310,33 @@ module Irc if(@host) begin - @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host) + sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host) rescue ArgumentError => e error "Your version of ruby does not support binding to a " error "specific local address, please upgrade if you wish " error "to use HOST = foo" error "(this option has been disabled in order to continue)" - @sock=TCPSocket.new(@server_uri.host, @server_uri.port) + sock=TCPSocket.new(@server_uri.host, @server_uri.port) end else - @sock=TCPSocket.new(@server_uri.host, @server_uri.port) + sock=TCPSocket.new(@server_uri.host, @server_uri.port) end if(@ssl) require 'openssl' ssl_context = OpenSSL::SSL::SSLContext.new() ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE - @rawsock = @sock - @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context) - @sock.sync_close = true - @sock.connect + sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context) + sock.sync_close = true + sock.connect end - @qthread = false - @qmutex = Mutex.new + @sock = sock + @last_send = Time.new - @sendq_delay + @flood_send = Time.new + @last_throttle = Time.new + @burst = 0 + @sock.extend(MonitorMixin) @sendq = MessageQueue.new - end - - def sendq_delay=(newfreq) - debug "changing sendq frequency to #{newfreq}" - @qmutex.synchronize do - @sendq_delay = newfreq - if newfreq == 0 - clearq - @timer.stop - else - @timer.start - end - end - end - - def sendq_burst=(newburst) - @qmutex.synchronize do - @sendq_burst = newburst - end + @qthread = Thread.new { writer_loop } end # used to send lines to the remote IRCd by skipping the queue @@ -388,10 +344,10 @@ module Irc # it should only be used for stuff that *must not* be queued, # i.e. the initial PASS, NICK and USER command # or the final QUIT message - def emergency_puts(message) - @qmutex.synchronize do - # debug "In puts - got mutex" - puts_critical(message) + def emergency_puts(message, penalty = false) + @sock.synchronize do + # debug "In puts - got @sock" + puts_critical(message, penalty) end end @@ -421,62 +377,11 @@ module Irc end def queue(msg, chan=nil, ring=0) - if @sendq_delay > 0 - @qmutex.synchronize do - @sendq.push msg, chan, ring - @timer.start - end - else - # just send it if queueing is disabled - self.emergency_puts(msg) - end - end - - # pop a message off the queue, send it - def spool - @qmutex.synchronize do - begin - debug "in spooler" - if @sendq.empty? - @timer.stop - return - end - now = Time.new - if (now >= (@last_send + @sendq_delay)) - debug "resetting @burst" - @burst = 0 - elsif (@burst > @sendq_burst) - # nope. can't send anything, come back to us next tick... - debug "can't send yet" - @timer.start - return - end - @flood_send = now if @flood_send < now - debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send" - while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY - debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})" - puts_critical(@sendq.shift, true) - end - if @sendq.empty? - @timer.stop - end - rescue Exception => e - error "Spooling failed: #{e.pretty_inspect}" - raise e - end - end + @sendq.push msg, chan, ring end def clearq - if @sock - @qmutex.synchronize do - unless @sendq.empty? - @sendq.clear - end - end - else - warning "Clearing socket while disconnected" - end + @sendq.clear end # flush the TCPSocket @@ -492,12 +397,13 @@ module Irc # shutdown the connection to the server def shutdown(how=2) return unless connected? + @qthread.kill + @qthread = nil begin @sock.close rescue Exception => e error "error while shutting down: #{e.pretty_inspect}" end - @rawsock = nil if @ssl @sock = nil @burst = 0 @sendq.clear @@ -505,7 +411,43 @@ module Irc private - # same as puts, but expects to be called with a mutex held on @qmutex + def writer_loop + loop do + # we could wait for the message, then calculate the delay and sleep + # if necessary. however, if high-priority message is enqueued while + # we sleep, it won't be the first to go out when the sleep is over. + # thus, we have to call Time.now() twice, once to calculate the delay + # and once to adjust @burst / @flood_send. + begin + now = Time.now + if @sendq_delay > 0 + burst_delay = 0 + if @burst > @sendq_burst + burst_delay = @last_send + @sendq_delay - now + end + + flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now + delay = [burst_delay, flood_delay, 0].max + if delay > 0 + debug "sleep(#{delay}) # (f: #{flood_delay}, b: #{burst_delay})" + sleep(delay) + end + end + msg = @sendq.shift + now = Time.now + @flood_send = now if @flood_send < now + @burst = 0 if @last_send + @sendq_delay < now + debug "got #{msg.inspect} from queue, sending" + emergency_puts(msg, true) + rescue Exception => e + error "Spooling failed: #{e.pretty_inspect}" + debug e.backtrace.join("\n") + raise e + end + end + end + + # same as puts, but expects to be called with a lock held on @sock def puts_critical(message, penalty=false) # debug "in puts_critical" begin