4 # Calculate the penalty which will be assigned to this message
7 # According to eggrdop, the initial penalty is
8 penalty = 1 + self.size/100
9 # on everything but UnderNET where it's
10 # penalty = 2 + self.size/120
12 cmd, pars = self.split($;,2)
13 debug "cmd: #{cmd}, pars: #{pars.inspect}"
16 chan, nick, msg = pars.split
17 chan = chan.split(',')
18 nick = nick.split(',')
22 chan, modes, argument = pars.split
27 extra += modes.split(/\+|-/).size
29 extra += 3 * modes.split(/\+|-/).size
33 extra += 2 * argument.split.size
35 penalty += extra * chan.split.size
38 penalty += 2 unless pars.split.size < 2
39 when :PRIVMSG, :NOTICE
40 dests = pars.split($;,2).first
41 penalty += dests.split(',').size
43 # I'm too lazy to implement this one correctly
45 when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
51 else # Unknown messages
55 debug "Wow, more than 99 secs of penalty!"
59 debug "Wow, less than 2 secs of penalty!"
62 debug "penalty: #{penalty}"
73 # A QueueRing is implemented as an array with elements in the form
74 # [chan, [message1, message2, ...]
75 # Note that the channel +chan+ has no actual bearing with the channels
76 # to which messages will be sent
102 cmess = @storage.assoc(chan)
104 idx = @storage.index(cmess)
106 @storage[idx] = cmess
108 @storage << [chan, [mess]]
114 warning "trying to access empty ring"
118 @last_idx = (@last_idx + 1) % @storage.size
119 mess = @storage[@last_idx][1].first
126 warning "trying to access empty ring"
129 @last_idx = (@last_idx + 1) % @storage.size
130 mess = @storage[@last_idx][1].shift
131 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
140 # a MessageQueue is an array of QueueRings
141 # rings have decreasing priority, so messages in ring 0
142 # are more important than messages in ring 1, and so on
143 @rings = Array.new(3) { |i|
147 # ring 0 is special in that if it's not empty, it will
148 # be popped. IOW, ring 0 can starve the other rings
149 # ring 0 is strictly FIFO and is therefore implemented
154 # the other rings are satisfied round-robin
156 self.extend(MonitorMixin)
157 @non_empty = self.new_cond
162 @rings.each { |r| r.clear }
167 def push(mess, chan=nil, cring=0)
171 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
174 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
175 @rings[ring].push mess, chan
181 def shift(tmout = nil)
183 @non_empty.wait(tmout) if self.empty?
191 !@rings.find { |r| !r.empty? }
195 @rings.inject(0) { |s, r| s + r.size }
201 return @rings[0].shift
203 (@rings.size - 1).times do
204 @last_ring = (@last_ring % (@rings.size - 1)) + 1
205 return @rings[@last_ring].shift unless @rings[@last_ring].empty?
207 warning "trying to access an empty message queue"
213 # wrapped TCPSocket for communication with the server.
214 # emulates a subset of TCPSocket functionality
217 MAX_IRC_SEND_PENALTY = 10
219 # total number of lines sent to the irc server
220 attr_reader :lines_sent
222 # total number of lines received from the irc server
223 attr_reader :lines_received
225 # total number of bytes sent to the irc server
226 attr_reader :bytes_sent
228 # total number of bytes received from the irc server
229 attr_reader :bytes_received
231 # accumulator for the throttle
232 attr_reader :throttle_bytes
234 # delay between lines sent
235 attr_accessor :sendq_delay
238 attr_accessor :sendq_burst
240 # an optional filter object. we call @filter.in(data) for
241 # all incoming data and @filter.out(data) for all outgoing data
244 # normalized uri of the current server
245 attr_reader :server_uri
247 # default trivial filter class
258 # set filter to identity, not to nil
260 @filter = f || IdentityFilter.new
263 # server_list:: list of servers to connect to
264 # host:: optional local host to bind to (ruby 1.7+ required)
265 # create a new Irc::Socket
266 def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
267 @server_list = server_list.dup
272 @filter = IdentityFilter.new
276 if opts.kind_of?(Hash) and opts.key?(:ssl)
283 @sendq_delay = sendq_delay.to_f
288 @sendq_burst = sendq_burst.to_i
298 # open a TCP connection to the server
301 warning "reconnecting while connected"
304 srv_uri = @server_list[@conn_count % @server_list.size].dup
305 srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
307 @server_uri = URI.parse(srv_uri)
308 @server_uri.port = 6667 if !@server_uri.port
309 debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
313 @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
314 rescue ArgumentError => e
315 error "Your version of ruby does not support binding to a "
316 error "specific local address, please upgrade if you wish "
317 error "to use HOST = foo"
318 error "(this option has been disabled in order to continue)"
319 @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
322 @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
326 ssl_context = OpenSSL::SSL::SSLContext.new()
327 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
329 @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
330 @sock.sync_close = true
333 @last_send = Time.new - @sendq_delay
334 @flood_send = Time.new
335 @last_throttle = Time.new
337 @sock.extend(MonitorMixin)
338 @sendq = MessageQueue.new
339 @qthread = Thread.new { writer_loop }
342 # used to send lines to the remote IRCd by skipping the queue
343 # message: IRC message to send
344 # it should only be used for stuff that *must not* be queued,
345 # i.e. the initial PASS, NICK and USER command
346 # or the final QUIT message
347 def emergency_puts(message, penalty = false)
349 # debug "In puts - got @sock"
350 puts_critical(message, penalty)
354 def handle_socket_error(string, e)
355 error "#{string} failed: #{e.pretty_inspect}"
356 # We assume that an error means that there are connection
357 # problems and that we should reconnect, so we
359 raise SocketError.new(e.inspect)
362 # get the next line from the server (blocks)
365 warning "socket get attempted while closed"
369 reply = @filter.in(@sock.gets)
371 reply.strip! if reply
372 debug "RECV: #{reply.inspect}"
374 rescue Exception => e
375 handle_socket_error(:RECV, e)
379 def queue(msg, chan=nil, ring=0)
380 @sendq.push msg, chan, ring
387 # flush the TCPSocket
392 # Wraps Kernel.select on the socket
393 def select(timeout=nil)
394 Kernel.select([@sock], nil, nil, timeout)
397 # shutdown the connection to the server
399 return unless connected?
404 rescue Exception => e
405 error "error while shutting down: #{e.pretty_inspect}"
407 @rawsock = nil if @ssl
417 # we could wait for the message, then calculate the delay and sleep
418 # if necessary. however, if high-priority message is enqueued while
419 # we sleep, it won't be the first to go out when the sleep is over.
420 # thus, we have to call Time.now() twice, once to calculate the delay
421 # and once to adjust @burst / @flood_send.
426 if @burst > @sendq_burst
427 burst_delay = @last_send + @sendq_delay - now
430 flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
431 delay = [burst_delay, flood_delay, 0].max
433 debug "sleep(#{delay}) # (f: #{flood_delay}, b: #{burst_delay})"
439 @flood_send = now if @flood_send < now
440 @burst = 0 if @last_send + @sendq_delay < now
441 debug "got #{msg.inspect} from queue, sending"
442 emergency_puts(msg, true)
443 rescue Exception => e
444 error "Spooling failed: #{e.pretty_inspect}"
445 debug e.backtrace.join("\n")
451 # same as puts, but expects to be called with a lock held on @sock
452 def puts_critical(message, penalty=false)
453 # debug "in puts_critical"
455 debug "SEND: #{message.inspect}"
457 error "SEND attempted on closed socket"
459 @sock.puts(@filter.out(message))
460 @last_send = Time.new
461 @flood_send += message.irc_send_penalty if penalty
465 rescue Exception => e
466 handle_socket_error(:SEND, e)