]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blobdiff - lib/rbot/ircsocket.rb
New configuration option plugins.blacklist holding an array of plugins to be blacklis...
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
index 7b4175de55688938f6f6ff8528f0f53e95eae258..48a4c2d73551814de1c6ae765400b31946eaa402 100644 (file)
@@ -4,26 +4,200 @@ module Irc
   require 'thread'
   require 'rbot/timer'
 
+  class QueueRing
+    # A QueueRing is implemented as an array with elements in the form
+    # [chan, [message1, message2, ...]
+    # Note that the channel +chan+ has no actual bearing with the channels
+    # to which messages will be sent
+
+    def initialize
+      @storage = Array.new
+      @last_idx = -1
+    end
+
+    def clear
+      @storage.clear
+      @last_idx = -1
+    end
+
+    def length
+      length = 0
+      @storage.each {|c|
+        length += c[1].length 
+      }
+      return length
+    end
+
+    def empty?
+      @storage.empty?
+    end
+
+    def push(mess, chan)
+      cmess = @storage.assoc(chan)
+      if cmess
+        idx = @storage.index(cmess)
+        cmess[1] << mess
+        @storage[idx] = cmess
+      else
+        @storage << [chan, [mess]]
+      end
+    end
+
+    def next
+      if empty?
+        warning "trying to access empty ring"
+        return nil
+      end
+      save_idx = @last_idx
+      @last_idx = (@last_idx + 1) % @storage.length
+      mess = @storage[@last_idx][1].first
+      @last_idx = save_idx
+      return mess
+    end
+
+    def shift
+      if empty?
+        warning "trying to access empty ring"
+        return nil
+      end
+      @last_idx = (@last_idx + 1) % @storage.length
+      mess = @storage[@last_idx][1].shift
+      @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
+      return mess
+    end
+
+  end
+
+  class MessageQueue
+    def initialize
+      # a MessageQueue is an array of QueueRings
+      # rings have decreasing priority, so messages in ring 0
+      # are more important than messages in ring 1, and so on
+      @rings = Array.new(3) { |i|
+        if i > 0
+          QueueRing.new
+        else
+          # ring 0 is special in that if it's not empty, it will
+          # be popped. IOW, ring 0 can starve the other rings
+          # ring 0 is strictly FIFO and is therefore implemented
+          # as an array
+          Array.new
+        end
+      }
+      # the other rings are satisfied round-robin
+      @last_ring = 0
+    end
+
+    def clear
+      @rings.each { |r|
+        r.clear
+      }
+      @last_ring = 0
+    end
+
+    def push(mess, chan=nil, cring=0)
+      ring = cring
+      if ring == 0
+        warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
+        @rings[0] << mess
+      else
+        error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
+        @rings[ring].push mess, chan
+      end
+    end
+
+    def empty?
+      @rings.each { |r|
+        return false unless r.empty?
+      }
+      return true
+    end
+
+    def length
+      len = 0
+      @rings.each { |r|
+        len += r.length
+      }
+      len
+    end
+
+    def next
+      if empty?
+        warning "trying to access empty ring"
+        return nil
+      end
+      mess = nil
+      if !@rings[0].empty?
+        mess = @rings[0].first
+      else
+        save_ring = @last_ring
+        (@rings.length - 1).times {
+          @last_ring = (@last_ring % (@rings.length - 1)) + 1
+          if !@rings[@last_ring].empty?
+            mess = @rings[@last_ring].next
+            break
+          end
+        }
+        @last_ring = save_ring
+      end
+      error "nil message" if mess.nil?
+      return mess
+    end
+
+    def shift
+      if empty?
+        warning "trying to access empty ring"
+        return nil
+      end
+      mess = nil
+      if !@rings[0].empty?
+        return @rings[0].shift
+      end
+      (@rings.length - 1).times {
+        @last_ring = (@last_ring % (@rings.length - 1)) + 1
+        if !@rings[@last_ring].empty?
+          return @rings[@last_ring].shift
+        end
+      }
+      error "nil message" if mess.nil?
+      return mess
+    end
+
+  end
+
   # wrapped TCPSocket for communication with the server.
   # emulates a subset of TCPSocket functionality
   class IrcSocket
     # total number of lines sent to the irc server
     attr_reader :lines_sent
-    
+
     # total number of lines received from the irc server
     attr_reader :lines_received
-    
+
+    # total number of bytes sent to the irc server
+    attr_reader :bytes_sent
+
+    # total number of bytes received from the irc server
+    attr_reader :bytes_received
+
+    # accumulator for the throttle
+    attr_reader :throttle_bytes
+
+    # byterate components
+    attr_reader :bytes_per
+    attr_reader :seconds_per
+
     # delay between lines sent
     attr_reader :sendq_delay
-    
+
     # max lines to burst
     attr_reader :sendq_burst
-    
+
     # server:: server to connect to
     # port::   IRCd port
     # host::   optional local host to bind to (ruby 1.7+ required)
     # create a new IrcSocket
-    def initialize(server, port, host, sendq_delay=2, sendq_burst=4)
+    def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
       @timer = Timer::Timer.new
       @timer.add(0.2) do
         spool
@@ -41,33 +215,58 @@ module Irc
         @sendq_delay = 2
       end
       @last_send = Time.new - @sendq_delay
+      @last_throttle = Time.new
       @burst = 0
       if sendq_burst
         @sendq_burst = sendq_burst.to_i
       else
         @sendq_burst = 4
       end
+      @bytes_per = 400
+      @seconds_per = 2
+      @throttle_bytes = 0
+      @throttle_div = 1
+      setbyterate(brt)
     end
-    
+
+    def setbyterate(brt)
+      if brt.match(/(\d+)\/(\d)/)
+        @bytes_per = $1.to_i
+        @seconds_per = $2.to_i
+        debug "Byterate now #{byterate}"
+        return true
+      else
+        debug "Couldn't set byterate #{brt}"
+        return false
+      end
+    end
+
+    def connected?
+      !@sock.nil?
+    end
+
     # open a TCP connection to the server
     def connect
-      @sock = nil
+      if connected?
+        warning "reconnecting while connected"
+        return
+      end
       if(@host)
         begin
           @sock=TCPSocket.new(@server, @port, @host)
         rescue ArgumentError => e
-          $stderr.puts "Your version of ruby does not support binding to a "
-          $stderr.puts "specific local address, please upgrade if you wish "
-          $stderr.puts "to use HOST = foo"
-          $stderr.puts "(this option has been disabled in order to continue)"
+          error "Your version of ruby does not support binding to a "
+          error "specific local address, please upgrade if you wish "
+          error "to use HOST = foo"
+          error "(this option has been disabled in order to continue)"
           @sock=TCPSocket.new(@server, @port)
         end
       else
         @sock=TCPSocket.new(@server, @port)
-      end 
+      end
       @qthread = false
       @qmutex = Mutex.new
-      @sendq = Array.new
+      @sendq = MessageQueue.new
     end
 
     def sendq_delay=(newfreq)
@@ -89,9 +288,42 @@ module Irc
       end
     end
 
-    # used to send lines to the remote IRCd
+    def byterate
+      return "#{@bytes_per}/#{@seconds_per}"
+    end
+
+    def byterate=(newrate)
+      @qmutex.synchronize do
+        setbyterate(newrate)
+      end
+    end
+
+    def run_throttle(more=0)
+      now = Time.new
+      if @throttle_bytes > 0
+        # If we ever reach the limit, we halve the actual allowed byterate
+        # until we manage to reset the throttle.
+        if @throttle_bytes >= @bytes_per
+          @throttle_div = 0.5
+        end
+        delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
+        if delta > 0
+          @throttle_bytes -= delta
+          @throttle_bytes = 0 if @throttle_bytes < 0
+          @last_throttle = now
+        end
+      else
+        @throttle_div = 1
+      end
+      @throttle_bytes += more
+    end
+
+    # used to send lines to the remote IRCd by skipping the queue
     # message: IRC message to send
-    def puts(message)
+    # it should only be used for stuff that *must not* be queued,
+    # i.e. the initial PASS, NICK and USER command
+    # or the final QUIT message
+    def emergency_puts(message)
       @qmutex.synchronize do
         # debug "In puts - got mutex"
         puts_critical(message)
@@ -100,63 +332,89 @@ module Irc
 
     # get the next line from the server (blocks)
     def gets
+      if @sock.nil?
+        warning "socket get attempted while closed"
+        return nil
+      end
       begin
-       reply = @sock.gets
-       @lines_received += 1
-       reply.strip! if reply
-       debug "RECV: #{reply.inspect}"
-       return reply
+        reply = @sock.gets
+        @lines_received += 1
+        reply.strip! if reply
+        debug "RECV: #{reply.inspect}"
+        return reply
       rescue => e
-        debug "socket get failed: #{e}"
-       return nil
+        warning "socket get failed: #{e.inspect}"
+        debug e.backtrace.join("\n")
+        return nil
       end
     end
 
-    def queue(msg)
+    def queue(msg, chan=nil, ring=0)
       if @sendq_delay > 0
         @qmutex.synchronize do
-          @sendq.push msg
+          @sendq.push msg, chan, ring
+          @timer.start
         end
-        @timer.start
       else
         # just send it if queueing is disabled
-        self.puts(msg)
+        self.emergency_puts(msg)
       end
     end
 
     # pop a message off the queue, send it
     def spool
-      if @sendq.empty?
-        @timer.stop
-        return
-      end
-      now = Time.new
-      if (now >= (@last_send + @sendq_delay))
-        # reset burst counter after @sendq_delay has passed
-        @burst = 0
-        debug "in spool, resetting @burst"
-      elsif (@burst >= @sendq_burst)
-        # nope. can't send anything, come back to us next tick...
-        @timer.start
-        return
-      end
       @qmutex.synchronize do
-        debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
-        (@sendq_burst - @burst).times do
-          break if @sendq.empty?
-          puts_critical(@sendq.shift)
+        begin
+          debug "in spooler"
+          if @sendq.empty?
+            @timer.stop
+            return
+          end
+          now = Time.new
+          if (now >= (@last_send + @sendq_delay))
+            # reset burst counter after @sendq_delay has passed
+            debug "resetting @burst"
+            @burst = 0
+          elsif (@burst >= @sendq_burst)
+            # nope. can't send anything, come back to us next tick...
+            debug "can't send yet"
+            @timer.start
+            return
+          end
+          debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
+          (@sendq_burst - @burst).times do
+            break if @sendq.empty?
+            mess = @sendq.next
+            if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
+              debug "flood protection: sending message of length #{mess.length}"
+              debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+              puts_critical(@sendq.shift)
+            else
+              debug "flood protection: throttling message of length #{mess.length}"
+              debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+              run_throttle
+              break
+            end
+          end
+          if @sendq.empty?
+            @timer.stop
+          end
+        rescue => e
+          error "Spooling failed: #{e.inspect}"
+          error e.backtrace.join("\n")
+        end
         end
       end
-      if @sendq.empty?
-        @timer.stop
-      end
-    end
 
     def clearq
-      unless @sendq.empty?
+      if @sock
         @qmutex.synchronize do
-          @sendq.clear
+          unless @sendq.empty?
+            @sendq.clear
+          end
         end
+      else
+        warning "Clearing socket while disconnected"
       end
     end
 
@@ -174,18 +432,28 @@ module Irc
     def shutdown(how=2)
       @sock.shutdown(how) unless @sock.nil?
       @sock = nil
+      @burst = 0
     end
 
     private
-    
+
     # same as puts, but expects to be called with a mutex held on @qmutex
     def puts_critical(message)
       # debug "in puts_critical"
-      debug "SEND: #{message.inspect}"
-      @sock.send(message + "\n",0)
-      @last_send = Time.new
-      @lines_sent += 1
-      @burst += 1
+      begin
+        debug "SEND: #{message.inspect}"
+        if @sock.nil?
+          error "SEND attempted on closed socket"
+        else
+          @sock.send(message + "\n",0)
+          @last_send = Time.new
+          @lines_sent += 1
+          @burst += 1
+          run_throttle(message.length + 1)
+        end
+      rescue => e
+        error "SEND failed: #{e.inspect}"
+      end
     end
 
   end