6 # This module implements the IRC socket interface, including IRC message
7 # penalty computation and the message queue system
12 # Calculate the penalty which will be assigned to this message
15 # According to eggdrop, the initial penalty is
16 penalty = 1 + self.size/100
17 # on everything but UnderNET where it's
18 # penalty = 2 + self.size/120
20 cmd, pars = self.split($;,2)
21 debug "cmd: #{cmd}, pars: #{pars.inspect}"
24 chan, nick, msg = pars.split
25 chan = chan.split(',')
26 nick = nick.split(',')
30 chan, modes, argument = pars.split
35 extra += modes.split(/\+|-/).size
37 extra += 3 * modes.split(/\+|-/).size
41 extra += 2 * argument.split.size
43 penalty += extra * chan.split.size
46 penalty += 2 unless pars.split.size < 2
47 when :PRIVMSG, :NOTICE
48 dests = pars.split($;,2).first
49 penalty += dests.split(',').size
53 penalty += args.inject(0){ |sum,x| sum += ((x.length > 4) ? 3 : 5) }
59 when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
65 else # Unknown messages
69 debug "Wow, more than 99 secs of penalty!"
73 debug "Wow, less than 2 secs of penalty!"
76 debug "penalty: #{penalty}"
87 # A QueueRing is implemented as an array with elements in the form
88 # [chan, [message1, message2, ...]
89 # Note that the channel +chan+ has no actual bearing with the channels
90 # to which messages will be sent
116 cmess = @storage.assoc(chan)
118 idx = @storage.index(cmess)
120 @storage[idx] = cmess
122 @storage << [chan, [mess]]
128 warning "trying to access empty ring"
132 @last_idx = (@last_idx + 1) % @storage.size
133 mess = @storage[@last_idx][1].first
140 warning "trying to access empty ring"
143 @last_idx = (@last_idx + 1) % @storage.size
144 mess = @storage[@last_idx][1].shift
145 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
154 # a MessageQueue is an array of QueueRings
155 # rings have decreasing priority, so messages in ring 0
156 # are more important than messages in ring 1, and so on
157 @rings = Array.new(3) { |i|
161 # ring 0 is special in that if it's not empty, it will
162 # be popped. IOW, ring 0 can starve the other rings
163 # ring 0 is strictly FIFO and is therefore implemented
168 # the other rings are satisfied round-robin
170 self.extend(MonitorMixin)
171 @non_empty = self.new_cond
176 @rings.each { |r| r.clear }
181 def push(mess, chan=nil, cring=0)
185 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
188 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
189 @rings[ring].push mess, chan
195 def shift(tmout = nil)
197 @non_empty.wait(tmout) if self.empty?
205 !@rings.find { |r| !r.empty? }
209 @rings.inject(0) { |s, r| s + r.size }
215 return @rings[0].shift
217 (@rings.size - 1).times do
218 @last_ring = (@last_ring % (@rings.size - 1)) + 1
219 return @rings[@last_ring].shift unless @rings[@last_ring].empty?
221 warning "trying to access an empty message queue"
227 # wrapped TCPSocket for communication with the server.
228 # emulates a subset of TCPSocket functionality
231 MAX_IRC_SEND_PENALTY = 10
233 # total number of lines sent to the irc server
234 attr_reader :lines_sent
236 # total number of lines received from the irc server
237 attr_reader :lines_received
239 # total number of bytes sent to the irc server
240 attr_reader :bytes_sent
242 # total number of bytes received from the irc server
243 attr_reader :bytes_received
245 # accumulator for the throttle
246 attr_reader :throttle_bytes
248 # an optional filter object. we call @filter.in(data) for
249 # all incoming data and @filter.out(data) for all outgoing data
252 # normalized uri of the current server
253 attr_reader :server_uri
255 # penalty multiplier (percent)
256 attr_accessor :penalty_pct
258 # default trivial filter class
269 # set filter to identity, not to nil
271 @filter = f || IdentityFilter.new
274 # server_list:: list of servers to connect to
275 # host:: optional local host to bind to (ruby 1.7+ required)
276 # create a new Irc::Socket
277 def initialize(server_list, host, opts={})
278 @server_list = server_list.dup
283 @filter = IdentityFilter.new
288 @ssl_verify = opts[:ssl_verify]
289 @ssl_ca_file = opts[:ssl_ca_file]
290 @ssl_ca_path = opts[:ssl_ca_path]
291 @penalty_pct = opts[:penalty_pct] || 100
298 # open a TCP connection to the server
301 warning "reconnecting while connected"
304 srv_uri = @server_list[@conn_count % @server_list.size].dup
305 srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
307 @server_uri = URI.parse(srv_uri)
308 @server_uri.port = 6667 if !@server_uri.port
310 debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
312 # if the host is a bracketed (IPv6) address, strip the brackets
313 # since Ruby doesn't like them in the Socket host parameter
314 # FIXME it would be safer to have it check for a valid
315 # IPv6 bracketed address rather than just stripping the brackets
316 srv_host = @server_uri.host
317 if srv_host.match(/\A\[(.*)\]\z/)
323 sock=TCPSocket.new(srv_host, @server_uri.port, @host)
324 rescue ArgumentError => e
325 error "Your version of ruby does not support binding to a "
326 error "specific local address, please upgrade if you wish "
327 error "to use HOST = foo"
328 error "(this option has been disabled in order to continue)"
329 sock=TCPSocket.new(srv_host, @server_uri.port)
332 sock=TCPSocket.new(srv_host, @server_uri.port)
336 ssl_context = OpenSSL::SSL::SSLContext.new()
338 ssl_context.ca_file = @ssl_ca_file if @ssl_ca_file and not @ssl_ca_file.empty?
339 ssl_context.ca_path = @ssl_ca_path if @ssl_ca_path and not @ssl_ca_path.empty?
340 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
342 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
344 sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
345 sock.sync_close = true
349 @last_send = Time.new
350 @flood_send = Time.new
352 @sock.extend(MonitorMixin)
353 @sendq = MessageQueue.new
354 @qthread = Thread.new { writer_loop }
357 # used to send lines to the remote IRCd by skipping the queue
358 # message: IRC message to send
359 # it should only be used for stuff that *must not* be queued,
360 # i.e. the initial PASS, NICK and USER command
361 # or the final QUIT message
362 def emergency_puts(message, penalty = false)
364 # debug "In puts - got @sock"
365 puts_critical(message, penalty)
369 def handle_socket_error(string, e)
370 error "#{string} failed: #{e.pretty_inspect}"
371 # We assume that an error means that there are connection
372 # problems and that we should reconnect, so we
374 raise SocketError.new(e.inspect)
377 # get the next line from the server (blocks)
380 warning "socket get attempted while closed"
384 reply = @filter.in(@sock.gets)
386 reply.strip! if reply
387 debug "RECV: #{reply.inspect}"
389 rescue Exception => e
390 handle_socket_error(:RECV, e)
394 def queue(msg, chan=nil, ring=0)
395 @sendq.push msg, chan, ring
402 # flush the TCPSocket
407 # Wraps Kernel.select on the socket
408 def select(timeout=nil)
409 Kernel.select([@sock], nil, nil, timeout)
412 # shutdown the connection to the server
414 return unless connected?
419 rescue Exception => e
420 error "error while shutting down: #{e.pretty_inspect}"
433 flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
434 delay = [flood_delay, 0].max
436 debug "sleep(#{delay}) # (f: #{flood_delay})"
440 debug "got #{msg.inspect} from queue, sending"
441 emergency_puts(msg, true)
442 rescue Exception => e
443 error "Spooling failed: #{e.pretty_inspect}"
444 debug e.backtrace.join("\n")
450 # same as puts, but expects to be called with a lock held on @sock
451 def puts_critical(message, penalty=false)
452 # debug "in puts_critical"
454 debug "SEND: #{message.inspect}"
456 error "SEND attempted on closed socket"
458 # we use Socket#syswrite() instead of Socket#puts() because
459 # the latter is racy and can cause double message output in
461 actual = @filter.out(message) + "\n"
463 @sock.syswrite actual
465 @flood_send = now if @flood_send < now
466 @flood_send += message.irc_send_penalty*@penalty_pct/100.0 if penalty
469 rescue Exception => e
470 handle_socket_error(:SEND, e)