summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/rbot/ircsocket.rb231
1 files changed, 88 insertions, 143 deletions
diff --git a/lib/rbot/ircsocket.rb b/lib/rbot/ircsocket.rb
index 4c6c74bb..9bdf43a0 100644
--- a/lib/rbot/ircsocket.rb
+++ b/lib/rbot/ircsocket.rb
@@ -1,3 +1,5 @@
+require 'monitor'
+
class ::String
# Calculate the penalty which will be assigned to this message
# by the IRCd
@@ -66,7 +68,6 @@ module Irc
require 'socket'
require 'thread'
- require 'rbot/timer'
class QueueRing
# A QueueRing is implemented as an array with elements in the form
@@ -134,6 +135,7 @@ module Irc
end
class MessageQueue
+
def initialize
# a MessageQueue is an array of QueueRings
# rings have decreasing priority, so messages in ring 0
@@ -151,82 +153,59 @@ module Irc
}
# the other rings are satisfied round-robin
@last_ring = 0
+ self.extend(MonitorMixin)
+ @non_empty = self.new_cond
end
def clear
- @rings.each { |r|
- r.clear
- }
- @last_ring = 0
+ self.synchronize do
+ @rings.each { |r| r.clear }
+ @last_ring = 0
+ end
end
def push(mess, chan=nil, cring=0)
ring = cring
- if ring == 0
- warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
- @rings[0] << mess
- else
- error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
- @rings[ring].push mess, chan
+ self.synchronize do
+ if ring == 0
+ warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
+ @rings[0] << mess
+ else
+ error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
+ @rings[ring].push mess, chan
+ end
+ @non_empty.signal
+ end
+ end
+
+ def shift(tmout = nil)
+ self.synchronize do
+ @non_empty.wait(tmout) if self.empty?
+ return unsafe_shift
end
end
+ protected
+
def empty?
- @rings.each { |r|
- return false unless r.empty?
- }
- return true
+ !@rings.find { |r| !r.empty? }
end
def length
- len = 0
- @rings.each { |r|
- len += r.size
- }
- len
+ @rings.inject(0) { |s, r| s + r.size }
end
alias :size :length
- def next
- if empty?
- warning "trying to access empty ring"
- return nil
- end
- mess = nil
- if !@rings[0].empty?
- mess = @rings[0].first
- else
- save_ring = @last_ring
- (@rings.size - 1).times {
- @last_ring = (@last_ring % (@rings.size - 1)) + 1
- if !@rings[@last_ring].empty?
- mess = @rings[@last_ring].next
- break
- end
- }
- @last_ring = save_ring
- end
- error "nil message" if mess.nil?
- return mess
- end
-
- def shift
- if empty?
- warning "trying to access empty ring"
- return nil
- end
- mess = nil
+ def unsafe_shift
if !@rings[0].empty?
return @rings[0].shift
end
- (@rings.size - 1).times {
+ (@rings.size - 1).times do
@last_ring = (@last_ring % (@rings.size - 1)) + 1
- if !@rings[@last_ring].empty?
- return @rings[@last_ring].shift
- end
- }
- error "nil message" if mess.nil?
- return mess
+ return @rings[@last_ring].shift unless @rings[@last_ring].empty?
+ end
+ warning "trying to access an empty message queue"
+ return nil
end
end
@@ -253,10 +232,10 @@ module Irc
attr_reader :throttle_bytes
# delay between lines sent
- attr_reader :sendq_delay
+ attr_accessor :sendq_delay
# max lines to burst
- attr_reader :sendq_burst
+ attr_accessor :sendq_burst
# an optional filter object. we call @filter.in(data) for
# all incoming data and @filter.out(data) for all outgoing data
@@ -285,8 +264,6 @@ module Irc
# host:: optional local host to bind to (ruby 1.7+ required)
# create a new IrcSocket
def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
- @timer = Timer.new
- @act_id = @timer.add(0.2, :blocked => true) { spool }
@server_list = server_list.dup
@server_uri = nil
@conn_count = 0
@@ -307,10 +284,6 @@ module Irc
else
@sendq_delay = 2
end
- @last_send = Time.new - @sendq_delay
- @flood_send = Time.new
- @last_throttle = Time.new
- @burst = 0
if sendq_burst
@sendq_burst = sendq_burst.to_i
else
@@ -357,28 +330,13 @@ module Irc
@sock.sync_close = true
@sock.connect
end
- @qthread = false
- @qmutex = Mutex.new
+ @last_send = Time.new - @sendq_delay
+ @flood_send = Time.new
+ @last_throttle = Time.new
+ @burst = 0
+ @sock.extend(MonitorMixin)
@sendq = MessageQueue.new
- end
-
- def sendq_delay=(newfreq)
- debug "changing sendq frequency to #{newfreq}"
- @qmutex.synchronize do
- @sendq_delay = newfreq
- if newfreq == 0
- clearq
- @timer.block(@act_id)
- else
- @timer.unblock(@act_id)
- end
- end
- end
-
- def sendq_burst=(newburst)
- @qmutex.synchronize do
- @sendq_burst = newburst
- end
+ @qthread = Thread.new { writer_loop }
end
# used to send lines to the remote IRCd by skipping the queue
@@ -386,10 +344,10 @@ module Irc
# it should only be used for stuff that *must not* be queued,
# i.e. the initial PASS, NICK and USER command
# or the final QUIT message
- def emergency_puts(message)
- @qmutex.synchronize do
- # debug "In puts - got mutex"
- puts_critical(message)
+ def emergency_puts(message, penalty = false)
+ @sock.synchronize do
+ # debug "In puts - got @sock"
+ puts_critical(message, penalty)
end
end
@@ -419,62 +377,11 @@ module Irc
end
def queue(msg, chan=nil, ring=0)
- if @sendq_delay > 0
- @qmutex.synchronize do
- @sendq.push msg, chan, ring
- @timer.unblock(@act_id)
- end
- else
- # just send it if queueing is disabled
- self.emergency_puts(msg)
- end
- end
-
- # pop a message off the queue, send it
- def spool
- @qmutex.synchronize do
- begin
- debug "in spooler"
- if @sendq.empty?
- @timer.block(@act_id)
- return
- end
- now = Time.new
- if (now >= (@last_send + @sendq_delay))
- debug "resetting @burst"
- @burst = 0
- elsif (@burst > @sendq_burst)
- # nope. can't send anything, come back to us next tick...
- debug "can't send yet"
- @timer.unblock(@act_id)
- return
- end
- @flood_send = now if @flood_send < now
- debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
- while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
- debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
- puts_critical(@sendq.shift, true)
- end
- if @sendq.empty?
- @timer.block(@act_id)
- end
- rescue Exception => e
- error "Spooling failed: #{e.pretty_inspect}"
- raise e
- end
- end
+ @sendq.push msg, chan, ring
end
def clearq
- if @sock
- @qmutex.synchronize do
- unless @sendq.empty?
- @sendq.clear
- end
- end
- else
- warning "Clearing socket while disconnected"
- end
+ @sendq.clear
end
# flush the TCPSocket
@@ -490,6 +397,8 @@ module Irc
# shutdown the connection to the server
def shutdown(how=2)
return unless connected?
+ @qthread.kill
+ @qthread = nil
begin
@sock.close
rescue Exception => e
@@ -503,7 +412,43 @@ module Irc
private
- # same as puts, but expects to be called with a mutex held on @qmutex
+ def writer_loop
+ loop do
+ # we could wait for the message, then calculate the delay and sleep
+ # if necessary. however, if high-priority message is enqueued while
+ # we sleep, it won't be the first to go out when the sleep is over.
+ # thus, we have to call Time.now() twice, once to calculate the delay
+ # and once to adjust @burst / @flood_send.
+ begin
+ now = Time.now
+ if @sendq_delay > 0
+ burst_delay = 0
+ if @burst > @sendq_burst
+ burst_delay = @last_send + @sendq_delay - now
+ end
+
+ flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
+ delay = [burst_delay, flood_delay, 0].max
+ if delay > 0
+ debug "sleep(#{delay}) # (f: #{flood_delay}, b: #{burst_delay})"
+ sleep(delay)
+ end
+ end
+ msg = @sendq.shift
+ now = Time.now
+ @flood_send = now if @flood_send < now
+ @burst = 0 if @last_send + @sendq_delay < now
+ debug "got #{msg.inspect} from queue, sending"
+ emergency_puts(msg, true)
+ rescue Exception => e
+ error "Spooling failed: #{e.pretty_inspect}"
+ debug e.backtrace.join("\n")
+ raise e
+ end
+ end
+ end
+
+ # same as puts, but expects to be called with a lock held on @sock
def puts_critical(message, penalty=false)
# debug "in puts_critical"
begin