]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
IRC Socket: documentation cleanup
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 #-- vim:sw=2:et
2 #++
3 #
4 # :title: IRC Socket
5 #
6 # This module implements the IRC socket interface, including IRC message
7 # penalty computation and the message queue system
8
9 require 'monitor'
10
11 class ::String
12   # Calculate the penalty which will be assigned to this message
13   # by the IRCd
14   def irc_send_penalty
15     # According to eggdrop, the initial penalty is
16     penalty = 1 + self.size/100
17     # on everything but UnderNET where it's
18     # penalty = 2 + self.size/120
19
20     cmd, pars = self.split($;,2)
21     debug "cmd: #{cmd}, pars: #{pars.inspect}"
22     case cmd.to_sym
23     when :KICK
24       chan, nick, msg = pars.split
25       chan = chan.split(',')
26       nick = nick.split(',')
27       penalty += nick.size
28       penalty *= chan.size
29     when :MODE
30       chan, modes, argument = pars.split
31       extra = 0
32       if modes
33         extra = 1
34         if argument
35           extra += modes.split(/\+|-/).size
36         else
37           extra += 3 * modes.split(/\+|-/).size
38         end
39       end
40       if argument
41         extra += 2 * argument.split.size
42       end
43       penalty += extra * chan.split.size
44     when :TOPIC
45       penalty += 1
46       penalty += 2 unless pars.split.size < 2
47     when :PRIVMSG, :NOTICE
48       dests = pars.split($;,2).first
49       penalty += dests.split(',').size
50     when :WHO
51       # I'm too lazy to implement this one correctly
52       penalty += 5
53     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
54       penalty += 2
55     when :INVITE, :NICK
56       penalty += 3
57     when :ISON
58       penalty += 1
59     else # Unknown messages
60       penalty += 1
61     end
62     if penalty > 99
63       debug "Wow, more than 99 secs of penalty!"
64       penalty = 99
65     end
66     if penalty < 2
67       debug "Wow, less than 2 secs of penalty!"
68       penalty = 2
69     end
70     debug "penalty: #{penalty}"
71     return penalty
72   end
73 end
74
75 module Irc
76
77   require 'socket'
78   require 'thread'
79
80   class QueueRing
81     # A QueueRing is implemented as an array with elements in the form
82     # [chan, [message1, message2, ...]
83     # Note that the channel +chan+ has no actual bearing with the channels
84     # to which messages will be sent
85
86     def initialize
87       @storage = Array.new
88       @last_idx = -1
89     end
90
91     def clear
92       @storage.clear
93       @last_idx = -1
94     end
95
96     def length
97       len = 0
98       @storage.each {|c|
99         len += c[1].size
100       }
101       return len
102     end
103     alias :size :length
104
105     def empty?
106       @storage.empty?
107     end
108
109     def push(mess, chan)
110       cmess = @storage.assoc(chan)
111       if cmess
112         idx = @storage.index(cmess)
113         cmess[1] << mess
114         @storage[idx] = cmess
115       else
116         @storage << [chan, [mess]]
117       end
118     end
119
120     def next
121       if empty?
122         warning "trying to access empty ring"
123         return nil
124       end
125       save_idx = @last_idx
126       @last_idx = (@last_idx + 1) % @storage.size
127       mess = @storage[@last_idx][1].first
128       @last_idx = save_idx
129       return mess
130     end
131
132     def shift
133       if empty?
134         warning "trying to access empty ring"
135         return nil
136       end
137       @last_idx = (@last_idx + 1) % @storage.size
138       mess = @storage[@last_idx][1].shift
139       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
140       return mess
141     end
142
143   end
144
145   class MessageQueue
146
147     def initialize
148       # a MessageQueue is an array of QueueRings
149       # rings have decreasing priority, so messages in ring 0
150       # are more important than messages in ring 1, and so on
151       @rings = Array.new(3) { |i|
152         if i > 0
153           QueueRing.new
154         else
155           # ring 0 is special in that if it's not empty, it will
156           # be popped. IOW, ring 0 can starve the other rings
157           # ring 0 is strictly FIFO and is therefore implemented
158           # as an array
159           Array.new
160         end
161       }
162       # the other rings are satisfied round-robin
163       @last_ring = 0
164       self.extend(MonitorMixin)
165       @non_empty = self.new_cond
166     end
167
168     def clear
169       self.synchronize do
170         @rings.each { |r| r.clear }
171         @last_ring = 0
172       end
173     end
174
175     def push(mess, chan=nil, cring=0)
176       ring = cring
177       self.synchronize do
178         if ring == 0
179           warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
180           @rings[0] << mess
181         else
182           error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
183           @rings[ring].push mess, chan
184         end
185         @non_empty.signal
186       end
187     end
188
189     def shift(tmout = nil)
190       self.synchronize do
191         @non_empty.wait(tmout) if self.empty?
192         return unsafe_shift
193       end
194     end
195
196     protected
197
198     def empty?
199       !@rings.find { |r| !r.empty? }
200     end
201
202     def length
203       @rings.inject(0) { |s, r| s + r.size }
204     end
205     alias :size :length
206
207     def unsafe_shift
208       if !@rings[0].empty?
209         return @rings[0].shift
210       end
211       (@rings.size - 1).times do
212         @last_ring = (@last_ring % (@rings.size - 1)) + 1
213         return @rings[@last_ring].shift unless @rings[@last_ring].empty?
214       end
215       warning "trying to access an empty message queue"
216       return nil
217     end
218
219   end
220
221   # wrapped TCPSocket for communication with the server.
222   # emulates a subset of TCPSocket functionality
223   class Socket
224
225     MAX_IRC_SEND_PENALTY = 10
226
227     # total number of lines sent to the irc server
228     attr_reader :lines_sent
229
230     # total number of lines received from the irc server
231     attr_reader :lines_received
232
233     # total number of bytes sent to the irc server
234     attr_reader :bytes_sent
235
236     # total number of bytes received from the irc server
237     attr_reader :bytes_received
238
239     # accumulator for the throttle
240     attr_reader :throttle_bytes
241
242     # an optional filter object. we call @filter.in(data) for
243     # all incoming data and @filter.out(data) for all outgoing data
244     attr_reader :filter
245
246     # normalized uri of the current server
247     attr_reader :server_uri
248
249     # default trivial filter class
250     class IdentityFilter
251         def in(x)
252             x
253         end
254
255         def out(x)
256             x
257         end
258     end
259
260     # set filter to identity, not to nil
261     def filter=(f)
262         @filter = f || IdentityFilter.new
263     end
264
265     # server_list:: list of servers to connect to
266     # host::   optional local host to bind to (ruby 1.7+ required)
267     # create a new Irc::Socket
268     def initialize(server_list, host, opts={})
269       @server_list = server_list.dup
270       @server_uri = nil
271       @conn_count = 0
272       @host = host
273       @sock = nil
274       @filter = IdentityFilter.new
275       @spooler = false
276       @lines_sent = 0
277       @lines_received = 0
278       if opts.kind_of?(Hash) and opts.key?(:ssl)
279         @ssl = opts[:ssl]
280       else
281         @ssl = false
282       end
283     end
284
285     def connected?
286       !@sock.nil?
287     end
288
289     # open a TCP connection to the server
290     def connect
291       if connected?
292         warning "reconnecting while connected"
293         return
294       end
295       srv_uri = @server_list[@conn_count % @server_list.size].dup
296       srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
297       @conn_count += 1
298       @server_uri = URI.parse(srv_uri)
299       @server_uri.port = 6667 if !@server_uri.port
300       debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
301
302       if(@host)
303         begin
304           sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
305         rescue ArgumentError => e
306           error "Your version of ruby does not support binding to a "
307           error "specific local address, please upgrade if you wish "
308           error "to use HOST = foo"
309           error "(this option has been disabled in order to continue)"
310           sock=TCPSocket.new(@server_uri.host, @server_uri.port)
311         end
312       else
313         sock=TCPSocket.new(@server_uri.host, @server_uri.port)
314       end
315       if(@ssl)
316         require 'openssl'
317         ssl_context = OpenSSL::SSL::SSLContext.new()
318         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
319         sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
320         sock.sync_close = true
321         sock.connect
322       end
323       @sock = sock
324       @last_send = Time.new
325       @flood_send = Time.new
326       @burst = 0
327       @sock.extend(MonitorMixin)
328       @sendq = MessageQueue.new
329       @qthread = Thread.new { writer_loop }
330     end
331
332     # used to send lines to the remote IRCd by skipping the queue
333     # message: IRC message to send
334     # it should only be used for stuff that *must not* be queued,
335     # i.e. the initial PASS, NICK and USER command
336     # or the final QUIT message
337     def emergency_puts(message, penalty = false)
338       @sock.synchronize do
339         # debug "In puts - got @sock"
340         puts_critical(message, penalty)
341       end
342     end
343
344     def handle_socket_error(string, e)
345       error "#{string} failed: #{e.pretty_inspect}"
346       # We assume that an error means that there are connection
347       # problems and that we should reconnect, so we
348       shutdown
349       raise SocketError.new(e.inspect)
350     end
351
352     # get the next line from the server (blocks)
353     def gets
354       if @sock.nil?
355         warning "socket get attempted while closed"
356         return nil
357       end
358       begin
359         reply = @filter.in(@sock.gets)
360         @lines_received += 1
361         reply.strip! if reply
362         debug "RECV: #{reply.inspect}"
363         return reply
364       rescue Exception => e
365         handle_socket_error(:RECV, e)
366       end
367     end
368
369     def queue(msg, chan=nil, ring=0)
370       @sendq.push msg, chan, ring
371     end
372
373     def clearq
374       @sendq.clear
375     end
376
377     # flush the TCPSocket
378     def flush
379       @sock.flush
380     end
381
382     # Wraps Kernel.select on the socket
383     def select(timeout=nil)
384       Kernel.select([@sock], nil, nil, timeout)
385     end
386
387     # shutdown the connection to the server
388     def shutdown(how=2)
389       return unless connected?
390       @qthread.kill
391       @qthread = nil
392       begin
393         @sock.close
394       rescue Exception => e
395         error "error while shutting down: #{e.pretty_inspect}"
396       end
397       @sock = nil
398       @sendq.clear
399     end
400
401     private
402
403     def writer_loop
404       loop do
405         begin
406           now = Time.now
407           flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
408           delay = [flood_delay, 0].max
409           if delay > 0
410             debug "sleep(#{delay}) # (f: #{flood_delay})"
411             sleep(delay)
412           end
413           msg = @sendq.shift
414           debug "got #{msg.inspect} from queue, sending"
415           emergency_puts(msg, true)
416         rescue Exception => e
417           error "Spooling failed: #{e.pretty_inspect}"
418           debug e.backtrace.join("\n")
419           raise e
420         end
421       end
422     end
423
424     # same as puts, but expects to be called with a lock held on @sock
425     def puts_critical(message, penalty=false)
426       # debug "in puts_critical"
427       begin
428         debug "SEND: #{message.inspect}"
429         if @sock.nil?
430           error "SEND attempted on closed socket"
431         else
432           # we use Socket#syswrite() instead of Socket#puts() because
433           # the latter is racy and can cause double message output in
434           # some circumstances
435           actual = @filter.out(message) + "\n"
436           now = Time.new
437           @sock.syswrite actual
438           @last_send = now
439           @flood_send = now if @flood_send < now
440           @flood_send += message.irc_send_penalty if penalty
441           @lines_sent += 1
442         end
443       rescue Exception => e
444         handle_socket_error(:SEND, e)
445       end
446     end
447
448   end
449
450 end