@last_idx = (@last_idx + 1) % @storage.length
mess = @storage[@last_idx][1].first
@last_idx = save_idx
- mess
+ return mess
end
def shift
@last_idx = (@last_idx + 1) % @storage.length
mess = @storage[@last_idx][1].shift
@storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
- mess
+ return mess
end
end
@rings.each { |r|
len += r.length
}
- len
+ len
end
def next
warning "trying to access empty ring"
return nil
end
+ mess = nil
if !@rings[0].empty?
mess = @rings[0].first
else
save_ring = @last_ring
(@rings.length - 1).times {
- @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1
+ @last_ring = (@last_ring % (@rings.length - 1)) + 1
if !@rings[@last_ring].empty?
mess = @rings[@last_ring].next
break
}
@last_ring = save_ring
end
+ error "nil message" if mess.nil?
return mess
end
warning "trying to access empty ring"
return nil
end
+ mess = nil
if !@rings[0].empty?
return @rings[0].shift
end
(@rings.length - 1).times {
- @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1
+ @last_ring = (@last_ring % (@rings.length - 1)) + 1
if !@rings[@last_ring].empty?
return @rings[@last_ring].shift
end
}
+ error "nil message" if mess.nil?
+ return mess
end
end
@bytes_per = 400
@seconds_per = 2
@throttle_bytes = 0
- @throttle_div = 1
+ @hit_limit = 0 # how many times did we reach the limit?
setbyterate(brt)
end
end
def byterate
- return "#{@bytes_per}/#{@seconds_per}"
+ return "#{@bytes_per}/#{@seconds_per} (limit hit #{@hit_limit} times)"
end
def byterate=(newrate)
def run_throttle(more=0)
now = Time.new
+ # Each time we reach the limit, we reduce the bitrate. We reset the bitrate only if the throttle
+ # manages to reset twice. This way we have better flood control, although the really perfect way
+ # would be to calculate our penalty the way it's done serverside.
if @throttle_bytes > 0
- # If we ever reach the limit, we halve the actual allowed byterate
- # until we manage to reset the throttle.
if @throttle_bytes >= @bytes_per
- @throttle_div = 0.5
+ @hit_limit += 1
+ @hit_limit = 3 if @hit_limit > 3
end
- delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
+ delta = ((now - @last_throttle)*(0.5**@hit_limit.ceil)*@bytes_per/@seconds_per).floor
if delta > 0
@throttle_bytes -= delta
@throttle_bytes = 0 if @throttle_bytes < 0
@last_throttle = now
end
else
- @throttle_div = 1
+ @hit_limit -= 0.5 if @hit_limit > 0
end
@throttle_bytes += more
end
# get the next line from the server (blocks)
def gets
if @sock.nil?
- debug "socket get attempted while closed"
+ warning "socket get attempted while closed"
return nil
end
begin
debug "RECV: #{reply.inspect}"
return reply
rescue => e
- debug "socket get failed: #{e.inspect}"
+ warning "socket get failed: #{e.inspect}"
+ debug e.backtrace.join("\n")
return nil
end
end
# pop a message off the queue, send it
def spool
@qmutex.synchronize do
- debug "in spooler"
- if @sendq.empty?
- @timer.stop
- return
- end
- now = Time.new
- if (now >= (@last_send + @sendq_delay))
- # reset burst counter after @sendq_delay has passed
- debug "resetting @burst"
- @burst = 0
- elsif (@burst >= @sendq_burst)
- # nope. can't send anything, come back to us next tick...
- debug "can't send yet"
- @timer.start
- return
- end
- # debug "Queue: #{@sendq.inspect}"
- debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
- (@sendq_burst - @burst).times do
- break if @sendq.empty?
- mess = @sendq.next
- # debug "Next message is #{mess.inspect}"
- if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
- debug "flood protection: sending message of length #{mess.length}"
- debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
- puts_critical(@sendq.shift)
- else
- debug "flood protection: throttling message of length #{mess.length}"
- debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
- run_throttle
- break
+ begin
+ debug "in spooler"
+ if @sendq.empty?
+ @timer.stop
+ return
end
- end
- if @sendq.empty?
- @timer.stop
+ now = Time.new
+ if (now >= (@last_send + @sendq_delay))
+ # after @sendq_delay has passed, we allow more @burst
+ # instead of resetting it to 0, we reduce it by 1
+ debug "decreasing @burst"
+ @burst -= 1 if @burst > 0
+ elsif (@burst >= @sendq_burst)
+ # nope. can't send anything, come back to us next tick...
+ debug "can't send yet"
+ @timer.start
+ return
+ end
+ debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
+ (@sendq_burst - @burst).times do
+ break if @sendq.empty?
+ mess = @sendq.next
+ if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
+ debug "flood protection: sending message of length #{mess.length}"
+ debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+ puts_critical(@sendq.shift)
+ else
+ debug "flood protection: throttling message of length #{mess.length}"
+ debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+ run_throttle
+ break
+ end
+ end
+ if @sendq.empty?
+ @timer.stop
+ end
+ rescue => e
+ error "Spooling failed: #{e.inspect}"
+ error e.backtrace.join("\n")
end
end
end
end
end
else
- debug "Clearing socket while disconnected"
+ warning "Clearing socket while disconnected"
end
end
end
rescue => e
error "SEND failed: #{e.inspect}"
+ raise
end
end