diff options
-rw-r--r-- | lib/rbot/ircbot.rb | 99 | ||||
-rw-r--r-- | lib/rbot/ircsocket.rb | 259 |
2 files changed, 279 insertions, 79 deletions
diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb index 6d76cc49..e82a1c98 100644 --- a/lib/rbot/ircbot.rb +++ b/lib/rbot/ircbot.rb @@ -272,8 +272,7 @@ class IrcBot warning "bad nick (#{data[:nick]})" } @client[:ping] = proc {|data| - # (jump the queue for pongs) - @socket.puts "PONG #{data[:pingid]}" + @socket.queue "PONG #{data[:pingid]}" } @client[:pong] = proc {|data| @last_ping = nil @@ -428,8 +427,8 @@ class IrcBot rescue => e raise e.class, "failed to connect to IRC server at #{@config['server.name']} #{@config['server.port']}: " + e end - @socket.puts "PASS " + @config['server.password'] if @config['server.password'] - @socket.puts "NICK #{@nick}\nUSER #{@config['irc.user']} 4 #{@config['server.name']} :Ruby bot. (c) Tom Gilbert" + @socket.emergency_puts "PASS " + @config['server.password'] if @config['server.password'] + @socket.emergency_puts "NICK #{@nick}\nUSER #{@config['irc.user']} 4 #{@config['server.name']} :Ruby bot. (c) Tom Gilbert" start_server_pings end @@ -468,7 +467,6 @@ class IrcBot error "non-net exception: #{e.class}: #{e}" error e.inspect error e.backtrace.join("\n") - @socket.shutdown # now we reconnect rescue => e error "unexpected exception: #{e.class}: #{e}" error e.inspect @@ -477,11 +475,14 @@ class IrcBot exit 2 end - log "disconnected" - stop_server_pings @channels.clear - @socket.clearq + if @socket.connected? + @socket.clearq + @socket.shutdown + end + + log "disconnected" log "waiting to reconnect" sleep @config['server.reconnect_wait'] @@ -495,7 +496,7 @@ class IrcBot # Type can be PRIVMSG, NOTICE, etc, but those you should really use the # relevant say() or notice() methods. This one should be used for IRCd # extensions you want to use in modules. - def sendmsg(type, where, message) + def sendmsg(type, where, message, chan=nil, ring=0) # limit it according to the byterate, splitting the message # taking into consideration the actual message length # and all the extra stuff @@ -504,7 +505,7 @@ class IrcBot left = @socket.bytes_per - type.length - where.length - 4 begin if(left >= message.length) - sendq("#{type} #{where} :#{message}") + sendq "#{type} #{where} :#{message}", chan, ring log_sent(type, where, message) return end @@ -514,46 +515,88 @@ class IrcBot message = line.slice!(lastspace, line.length) + message message.gsub!(/^\s+/, "") end - sendq("#{type} #{where} :#{line}") + sendq "#{type} #{where} :#{line}", chan, ring log_sent(type, where, line) end while(message.length > 0) end # queue an arbitraty message for the server - def sendq(message="") + def sendq(message="", chan=nil, ring=0) # temporary - @socket.queue(message) + @socket.queue(message, chan, ring) end # send a notice message to channel/nick +where+ - def notice(where, message) + def notice(where, message, mchan=nil, mring=-1) + if mchan == "" + chan = mchan + else + chan = where + end + if mring < 0 + if where =~ /^#/ + ring = 2 + else + ring = 1 + end + else + ring = mring + end message.each_line { |line| line.chomp! next unless(line.length > 0) - sendmsg("NOTICE", where, line) + sendmsg "NOTICE", where, line, chan, ring } end # say something (PRIVMSG) to channel/nick +where+ - def say(where, message) + def say(where, message, mchan="", mring=-1) + if mchan == "" + chan = mchan + else + chan = where + end + if mring < 0 + if where =~ /^#/ + ring = 2 + else + ring = 1 + end + else + ring = mring + end message.to_s.gsub(/[\r\n]+/, "\n").each_line { |line| line.chomp! next unless(line.length > 0) unless((where =~ /^#/) && (@channels.has_key?(where) && @channels[where].quiet)) - sendmsg("PRIVMSG", where, line) + sendmsg "PRIVMSG", where, line, chan, ring end } end # perform a CTCP action with message +message+ to channel/nick +where+ - def action(where, message) - sendq("PRIVMSG #{where} :\001ACTION #{message}\001") + def action(where, message, mchan="", mring=-1) + if mchan == "" + chan = mchan + else + chan = where + end + if mring < 0 + if where =~ /^#/ + ring = 2 + else + ring = 1 + end + else + ring = mring + end + sendq "PRIVMSG #{where} :\001ACTION #{message}\001", chan, ring if(where =~ /^#/) irclog "* #{@nick} #{message}", where elsif (where =~ /^(\S*)!.*$/) - irclog "* #{@nick}[#{where}] #{message}", $1 + irclog "* #{@nick}[#{where}] #{message}", $1 else - irclog "* #{@nick}[#{where}] #{message}", where + irclog "* #{@nick}[#{where}] #{message}", where end end @@ -578,7 +621,7 @@ class IrcBot # set topic of channel +where+ to +topic+ def topic(where, topic) - sendq "TOPIC #{where} :#{topic}" + sendq "TOPIC #{where} :#{topic}", where, 2 end # disconnect from the server and cleanup all plugins and modules @@ -597,7 +640,7 @@ class IrcBot debug "Clearing socket" @socket.clearq debug "Sending quit message" - @socket.puts "QUIT :#{message}" + @socket.emergency_puts "QUIT :#{message}" debug "Flushing socket" @socket.flush debug "Shutting down socket" @@ -660,15 +703,15 @@ class IrcBot # join a channel def join(channel, key=nil) if(key) - sendq "JOIN #{channel} :#{key}" + sendq "JOIN #{channel} :#{key}", channel, 2 else - sendq "JOIN #{channel}" + sendq "JOIN #{channel}", channel, 2 end end # part a channel def part(channel, message="") - sendq "PART #{channel} :#{message}" + sendq "PART #{channel} :#{message}", channel, 2 end # attempt to change bot's nick to +name+ @@ -678,7 +721,7 @@ class IrcBot # changing mode def mode(channel, mode, target) - sendq "MODE #{channel} #{mode} #{target}" + sendq "MODE #{channel} #{mode} #{target}", channel, 2 end # m:: message asking for help @@ -727,7 +770,7 @@ class IrcBot # we want to respond to a hung server within 30 secs or so @ping_timer = @timer.add(30) { @last_ping = Time.now - @socket.puts "PING :rbot" + @socket.queue "PING :rbot" } @pong_timer = @timer.add(10) { unless @last_ping.nil? diff --git a/lib/rbot/ircsocket.rb b/lib/rbot/ircsocket.rb index 4ee3be23..23f29086 100644 --- a/lib/rbot/ircsocket.rb +++ b/lib/rbot/ircsocket.rb @@ -4,6 +4,158 @@ module Irc require 'thread' require 'rbot/timer' + class QueueRing + # A QueueRing is implemented as an array with elements in the form + # [chan, [message1, message2, ...] + # Note that the channel +chan+ has no actual bearing with the channels + # to which messages will be sent + + def initialize + @storage = Array.new + @last_idx = -1 + end + + def clear + @storage.clear + @last_idx = -1 + end + + def length + @storage.length + end + + def empty? + @storage.empty? + end + + def push(mess, chan) + cmess = @storage.assoc(chan) + if cmess + idx = @storage.index(cmess) + cmess[1] << mess + @storage[idx] = cmess + else + @storage << [chan, [mess]] + end + end + + def next + if empty? + warning "trying to access empty ring" + return nil + end + save_idx = @last_idx + @last_idx = (@last_idx + 1) % @storage.length + mess = @storage[@last_idx][1].first + @last_idx = save_idx + mess + end + + def shift + if empty? + warning "trying to access empty ring" + return nil + end + @last_idx = (@last_idx + 1) % @storage.length + mess = @storage[@last_idx][1].shift + @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == [] + mess + end + + end + + class MessageQueue + def initialize + # a MessageQueue is an array of QueueRings + # rings have decreasing priority, so messages in ring 0 + # are more important than messages in ring 1, and so on + @rings = Array.new(3) { |i| + if i > 0 + QueueRing.new + else + # ring 0 is special in that if it's not empty, it will + # be popped. IOW, ring 0 can starve the other rings + # ring 0 is strictly FIFO and is therefore implemented + # as an array + Array.new + end + } + # the other rings are satisfied round-robin + @last_ring = 0 + end + + def clear + @rings.each { |r| + r.clear + } + @last_ring = 0 + end + + def push(mess, chan=nil, cring=0) + ring = cring + if ring == 0 + warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil? + @rings[0] << mess + else + error "message #{mess} at ring #{ring} must have a channel" if chan.nil? + @rings[ring].push mess, chan + end + end + + def empty? + @rings.each { |r| + return false unless r.empty? + } + return true + end + + def length + len = 0 + @rings.each { |r| + len += r.length + } + len + end + + def next + if empty? + warning "trying to access empty ring" + return nil + end + if !@rings[0].empty? + mess = @rings[0].first + else + save_ring = @last_ring + (@rings.length - 1).times { + @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1 + if !@rings[@last_ring].empty? + mess = @rings[@last_ring].next + break + end + } + @last_ring = save_ring + end + return mess + end + + def shift + if empty? + warning "trying to access empty ring" + return nil + end + if !@rings[0].empty? + return @rings[0].shift + end + (@rings.length - 1).times { + @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1 + if !@rings[@last_ring].empty? + return @rings[@last_ring].shift + end + } + end + + end + # wrapped TCPSocket for communication with the server. # emulates a subset of TCPSocket functionality class IrcSocket @@ -87,8 +239,8 @@ module Irc # open a TCP connection to the server def connect if connected? - debug "reconnecting socket while connected" - shutdown + warning "reconnecting while connected" + return end if(@host) begin @@ -105,7 +257,7 @@ module Irc end @qthread = false @qmutex = Mutex.new - @sendq = Array.new + @sendq = MessageQueue.new end def sendq_delay=(newfreq) @@ -142,17 +294,6 @@ module Irc if @throttle_bytes > 0 # If we ever reach the limit, we halve the actual allowed byterate # until we manage to reset the throttle. - # I don't know if this is the best way, though, because the real - # problem is probably non-queued messages like PINGs and PONGs. - # A better solution would probably be to have two queues, - # one for priority messages and another one for normal messages. - # Even better, we should have: - # * one queue for server stuff - # * one for each channel - # * one for each private communication - # The server queue would have priority, everything else would be served - # round-robin, so that someone making the bot flood one channel wouldn't - # prevent the bot from working on other channels (or private communications) if @throttle_bytes >= @bytes_per @throttle_div = 0.5 end @@ -168,9 +309,12 @@ module Irc @throttle_bytes += more end - # used to send lines to the remote IRCd + # used to send lines to the remote IRCd by skipping the queue # message: IRC message to send - def puts(message) + # it should only be used for stuff that *must not* be queued, + # i.e. the initial PASS, NICK and USER command + # or the final QUIT message + def emergency_puts(message) @qmutex.synchronize do # debug "In puts - got mutex" puts_critical(message) @@ -195,60 +339,64 @@ module Irc end end - def queue(msg) + def queue(msg, chan=nil, ring=0) if @sendq_delay > 0 @qmutex.synchronize do - @sendq.push msg + @sendq.push msg, chan, ring + @timer.start end - @timer.start else # just send it if queueing is disabled - self.puts(msg) + self.emergency_puts(msg) end end # pop a message off the queue, send it def spool - if @sendq.empty? - @timer.stop - return - end - now = Time.new - if (now >= (@last_send + @sendq_delay)) - # reset burst counter after @sendq_delay has passed - @burst = 0 - debug "in spool, resetting @burst" - elsif (@burst >= @sendq_burst) - # nope. can't send anything, come back to us next tick... - @timer.start - return - end @qmutex.synchronize do - debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)" + debug "in spooler" + if @sendq.empty? + @timer.stop + return + end + now = Time.new + if (now >= (@last_send + @sendq_delay)) + # reset burst counter after @sendq_delay has passed + debug "resetting @burst" + @burst = 0 + elsif (@burst >= @sendq_burst) + # nope. can't send anything, come back to us next tick... + debug "can't send yet" + @timer.start + return + end + # debug "Queue: #{@sendq.inspect}" + debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send" (@sendq_burst - @burst).times do break if @sendq.empty? - mess = @sendq[0] + mess = @sendq.next + # debug "Next message is #{mess.inspect}" if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per - debug "(flood protection: sending message of length #{mess.length})" - debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})" + debug "flood protection: sending message of length #{mess.length}" + debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})" puts_critical(@sendq.shift) else - debug "(flood protection: throttling message of length #{mess.length})" - debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})" + debug "flood protection: throttling message of length #{mess.length}" + debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})" run_throttle break end end - end - if @sendq.empty? - @timer.stop + if @sendq.empty? + @timer.stop + end end end def clearq if @sock - unless @sendq.empty? - @qmutex.synchronize do + @qmutex.synchronize do + unless @sendq.empty? @sendq.clear end end @@ -271,6 +419,7 @@ module Irc def shutdown(how=2) @sock.shutdown(how) unless @sock.nil? @sock = nil + @burst = 0 end private @@ -278,12 +427,20 @@ module Irc # same as puts, but expects to be called with a mutex held on @qmutex def puts_critical(message) # debug "in puts_critical" - debug "SEND: #{message.inspect}" - @sock.send(message + "\n",0) - @last_send = Time.new - @lines_sent += 1 - @burst += 1 - run_throttle(message.length + 1) + begin + debug "SEND: #{message.inspect}" + if @sock.nil? + error "SEND attempted on closed socket" + else + @sock.send(message + "\n",0) + @last_send = Time.new + @lines_sent += 1 + @burst += 1 + run_throttle(message.length + 1) + end + rescue => e + error "SEND failed: #{e.inspect}" + end end end |