]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blobdiff - lib/rbot/ircsocket.rb
config core module: default command auth fixes
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
index c1bc36115c28af7b8347e02c1c81b109bf3fdbf9..2e6ff452d7f47a0685e3e7655f57cb5d8db940ae 100644 (file)
@@ -1,3 +1,5 @@
+require 'monitor'
+
 class ::String
   # Calculate the penalty which will be assigned to this message
   # by the IRCd
@@ -66,7 +68,6 @@ module Irc
 
   require 'socket'
   require 'thread'
-  require 'rbot/timer'
 
   class QueueRing
     # A QueueRing is implemented as an array with elements in the form
@@ -134,6 +135,7 @@ module Irc
   end
 
   class MessageQueue
+
     def initialize
       # a MessageQueue is an array of QueueRings
       # rings have decreasing priority, so messages in ring 0
@@ -151,89 +153,66 @@ module Irc
       }
       # the other rings are satisfied round-robin
       @last_ring = 0
+      self.extend(MonitorMixin)
+      @non_empty = self.new_cond
     end
 
     def clear
-      @rings.each { |r|
-        r.clear
-      }
-      @last_ring = 0
+      self.synchronize do
+        @rings.each { |r| r.clear }
+        @last_ring = 0
+      end
     end
 
     def push(mess, chan=nil, cring=0)
       ring = cring
-      if ring == 0
-        warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
-        @rings[0] << mess
-      else
-        error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
-        @rings[ring].push mess, chan
+      self.synchronize do
+        if ring == 0
+          warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
+          @rings[0] << mess
+        else
+          error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
+          @rings[ring].push mess, chan
+        end
+        @non_empty.signal
+      end
+    end
+
+    def shift(tmout = nil)
+      self.synchronize do
+        @non_empty.wait(tmout) if self.empty?
+        return unsafe_shift
       end
     end
 
+    protected
+
     def empty?
-      @rings.each { |r|
-        return false unless r.empty?
-      }
-      return true
+      !@rings.find { |r| !r.empty? }
     end
 
     def length
-      len = 0
-      @rings.each { |r|
-        len += r.size
-      }
-      len
+      @rings.inject(0) { |s, r| s + r.size }
     end
     alias :size :length
 
-    def next
-      if empty?
-        warning "trying to access empty ring"
-        return nil
-      end
-      mess = nil
-      if !@rings[0].empty?
-        mess = @rings[0].first
-      else
-        save_ring = @last_ring
-        (@rings.size - 1).times {
-          @last_ring = (@last_ring % (@rings.size - 1)) + 1
-          if !@rings[@last_ring].empty?
-            mess = @rings[@last_ring].next
-            break
-          end
-        }
-        @last_ring = save_ring
-      end
-      error "nil message" if mess.nil?
-      return mess
-    end
-
-    def shift
-      if empty?
-        warning "trying to access empty ring"
-        return nil
-      end
-      mess = nil
+    def unsafe_shift
       if !@rings[0].empty?
         return @rings[0].shift
       end
-      (@rings.size - 1).times {
+      (@rings.size - 1).times do
         @last_ring = (@last_ring % (@rings.size - 1)) + 1
-        if !@rings[@last_ring].empty?
-          return @rings[@last_ring].shift
-        end
-      }
-      error "nil message" if mess.nil?
-      return mess
+        return @rings[@last_ring].shift unless @rings[@last_ring].empty?
+      end
+      warning "trying to access an empty message queue"
+      return nil
     end
 
   end
 
   # wrapped TCPSocket for communication with the server.
   # emulates a subset of TCPSocket functionality
-  class IrcSocket
+  class Socket
 
     MAX_IRC_SEND_PENALTY = 10
 
@@ -253,10 +232,10 @@ module Irc
     attr_reader :throttle_bytes
 
     # delay between lines sent
-    attr_reader :sendq_delay
+    attr_accessor :sendq_delay
 
     # max lines to burst
-    attr_reader :sendq_burst
+    attr_accessor :sendq_burst
 
     # an optional filter object. we call @filter.in(data) for
     # all incoming data and @filter.out(data) for all outgoing data
@@ -283,12 +262,8 @@ module Irc
 
     # server_list:: list of servers to connect to
     # host::   optional local host to bind to (ruby 1.7+ required)
-    # create a new IrcSocket
+    # create a new Irc::Socket
     def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
-      @timer = Timer::Timer.new
-      @timer.add(0.2) do
-        spool
-      end
       @server_list = server_list.dup
       @server_uri = nil
       @conn_count = 0
@@ -309,10 +284,6 @@ module Irc
       else
         @sendq_delay = 2
       end
-      @last_send = Time.new - @sendq_delay
-      @flood_send = Time.new
-      @last_throttle = Time.new
-      @burst = 0
       if sendq_burst
         @sendq_burst = sendq_burst.to_i
       else
@@ -339,48 +310,33 @@ module Irc
 
       if(@host)
         begin
-          @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
+          sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
         rescue ArgumentError => e
           error "Your version of ruby does not support binding to a "
           error "specific local address, please upgrade if you wish "
           error "to use HOST = foo"
           error "(this option has been disabled in order to continue)"
-          @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
+          sock=TCPSocket.new(@server_uri.host, @server_uri.port)
         end
       else
-        @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
+        sock=TCPSocket.new(@server_uri.host, @server_uri.port)
       end
       if(@ssl)
         require 'openssl'
         ssl_context = OpenSSL::SSL::SSLContext.new()
         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
-        @rawsock = @sock
-        @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
-        @sock.sync_close = true
-        @sock.connect
+        sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
+        sock.sync_close = true
+        sock.connect
       end
-      @qthread = false
-      @qmutex = Mutex.new
+      @sock = sock
+      @last_send = Time.new - @sendq_delay
+      @flood_send = Time.new
+      @last_throttle = Time.new
+      @burst = 0
+      @sock.extend(MonitorMixin)
       @sendq = MessageQueue.new
-    end
-
-    def sendq_delay=(newfreq)
-      debug "changing sendq frequency to #{newfreq}"
-      @qmutex.synchronize do
-        @sendq_delay = newfreq
-        if newfreq == 0
-          clearq
-          @timer.stop
-        else
-          @timer.start
-        end
-      end
-    end
-
-    def sendq_burst=(newburst)
-      @qmutex.synchronize do
-        @sendq_burst = newburst
-      end
+      @qthread = Thread.new { writer_loop }
     end
 
     # used to send lines to the remote IRCd by skipping the queue
@@ -388,10 +344,10 @@ module Irc
     # it should only be used for stuff that *must not* be queued,
     # i.e. the initial PASS, NICK and USER command
     # or the final QUIT message
-    def emergency_puts(message)
-      @qmutex.synchronize do
-        # debug "In puts - got mutex"
-        puts_critical(message)
+    def emergency_puts(message, penalty = false)
+      @sock.synchronize do
+        # debug "In puts - got @sock"
+        puts_critical(message, penalty)
       end
     end
 
@@ -421,62 +377,11 @@ module Irc
     end
 
     def queue(msg, chan=nil, ring=0)
-      if @sendq_delay > 0
-        @qmutex.synchronize do
-          @sendq.push msg, chan, ring
-          @timer.start
-        end
-      else
-        # just send it if queueing is disabled
-        self.emergency_puts(msg)
-      end
-    end
-
-    # pop a message off the queue, send it
-    def spool
-      @qmutex.synchronize do
-        begin
-          debug "in spooler"
-          if @sendq.empty?
-            @timer.stop
-            return
-          end
-          now = Time.new
-          if (now >= (@last_send + @sendq_delay))
-            debug "resetting @burst"
-            @burst = 0
-          elsif (@burst > @sendq_burst)
-            # nope. can't send anything, come back to us next tick...
-            debug "can't send yet"
-            @timer.start
-            return
-          end
-          @flood_send = now if @flood_send < now
-          debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
-          while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
-            debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
-            puts_critical(@sendq.shift, true)
-          end
-          if @sendq.empty?
-            @timer.stop
-          end
-        rescue Exception => e
-          error "Spooling failed: #{e.pretty_inspect}"
-          raise e
-        end
-      end
+      @sendq.push msg, chan, ring
     end
 
     def clearq
-      if @sock
-        @qmutex.synchronize do
-          unless @sendq.empty?
-            @sendq.clear
-          end
-        end
-      else
-        warning "Clearing socket while disconnected"
-      end
+      @sendq.clear
     end
 
     # flush the TCPSocket
@@ -492,12 +397,13 @@ module Irc
     # shutdown the connection to the server
     def shutdown(how=2)
       return unless connected?
+      @qthread.kill
+      @qthread = nil
       begin
         @sock.close
       rescue Exception => e
         error "error while shutting down: #{e.pretty_inspect}"
       end
-      @rawsock = nil if @ssl
       @sock = nil
       @burst = 0
       @sendq.clear
@@ -505,7 +411,43 @@ module Irc
 
     private
 
-    # same as puts, but expects to be called with a mutex held on @qmutex
+    def writer_loop
+      loop do
+        # we could wait for the message, then calculate the delay and sleep
+        # if necessary. however, if high-priority message is enqueued while
+        # we sleep, it won't be the first to go out when the sleep is over.
+        # thus, we have to call Time.now() twice, once to calculate the delay
+        # and once to adjust @burst / @flood_send.
+        begin
+          now = Time.now
+          if @sendq_delay > 0
+            burst_delay = 0
+            if @burst > @sendq_burst
+              burst_delay = @last_send + @sendq_delay - now
+            end
+
+            flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
+            delay = [burst_delay, flood_delay, 0].max
+            if delay > 0
+              debug "sleep(#{delay}) # (f: #{flood_delay}, b: #{burst_delay})"
+              sleep(delay) 
+            end
+          end
+          msg = @sendq.shift
+          now = Time.now
+          @flood_send = now if @flood_send < now
+          @burst = 0 if @last_send + @sendq_delay < now
+          debug "got #{msg.inspect} from queue, sending"
+          emergency_puts(msg, true)
+        rescue Exception => e
+          error "Spooling failed: #{e.pretty_inspect}"
+          debug e.backtrace.join("\n")
+          raise e
+        end
+      end
+    end
+
+    # same as puts, but expects to be called with a lock held on @sock
     def puts_critical(message, penalty=false)
       # debug "in puts_critical"
       begin