]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
Fix issue #101
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   # wrapped TCPSocket for communication with the server.
8   # emulates a subset of TCPSocket functionality
9   class IrcSocket
10     # total number of lines sent to the irc server
11     attr_reader :lines_sent
12     
13     # total number of lines received from the irc server
14     attr_reader :lines_received
15     
16     # delay between lines sent
17     attr_reader :sendq_delay
18     
19     # max lines to burst
20     attr_reader :sendq_burst
21     
22     # server:: server to connect to
23     # port::   IRCd port
24     # host::   optional local host to bind to (ruby 1.7+ required)
25     # create a new IrcSocket
26     def initialize(server, port, host, sendq_delay=2, sendq_burst=4)
27       @timer = Timer::Timer.new
28       @timer.add(0.2) do
29         spool
30       end
31       @server = server.dup
32       @port = port.to_i
33       @host = host
34       @sock = nil
35       @spooler = false
36       @lines_sent = 0
37       @lines_received = 0
38       if sendq_delay
39         @sendq_delay = sendq_delay.to_f
40       else
41         @sendq_delay = 2
42       end
43       @last_send = Time.new - @sendq_delay
44       @burst = 0
45       if sendq_burst
46         @sendq_burst = sendq_burst.to_i
47       else
48         @sendq_burst = 4
49       end
50     end
51     
52     def connected?
53       !@sock.nil?
54     end
55
56     # open a TCP connection to the server
57     def connect
58       if connected?
59         shutdown
60       end
61       if(@host)
62         begin
63           @sock=TCPSocket.new(@server, @port, @host)
64         rescue ArgumentError => e
65           $stderr.puts "Your version of ruby does not support binding to a "
66           $stderr.puts "specific local address, please upgrade if you wish "
67           $stderr.puts "to use HOST = foo"
68           $stderr.puts "(this option has been disabled in order to continue)"
69           @sock=TCPSocket.new(@server, @port)
70         end
71       else
72         @sock=TCPSocket.new(@server, @port)
73       end 
74       @qthread = false
75       @qmutex = Mutex.new
76       @sendq = Array.new
77     end
78
79     def sendq_delay=(newfreq)
80       debug "changing sendq frequency to #{newfreq}"
81       @qmutex.synchronize do
82         @sendq_delay = newfreq
83         if newfreq == 0
84           clearq
85           @timer.stop
86         else
87           @timer.start
88         end
89       end
90     end
91
92     def sendq_burst=(newburst)
93       @qmutex.synchronize do
94         @sendq_burst = newburst
95       end
96     end
97
98     # used to send lines to the remote IRCd
99     # message: IRC message to send
100     def puts(message)
101       @qmutex.synchronize do
102         # debug "In puts - got mutex"
103         puts_critical(message)
104       end
105     end
106
107     # get the next line from the server (blocks)
108     def gets
109       begin
110         reply = @sock.gets
111         @lines_received += 1
112         reply.strip! if reply
113         debug "RECV: #{reply.inspect}"
114         return reply
115       rescue => e
116         debug "socket get failed: #{e}"
117         return nil
118       end
119     end
120
121     def queue(msg)
122       if @sendq_delay > 0
123         @qmutex.synchronize do
124           @sendq.push msg
125         end
126         @timer.start
127       else
128         # just send it if queueing is disabled
129         self.puts(msg)
130       end
131     end
132
133     # pop a message off the queue, send it
134     def spool
135       if @sendq.empty?
136         @timer.stop
137         return
138       end
139       now = Time.new
140       if (now >= (@last_send + @sendq_delay))
141         # reset burst counter after @sendq_delay has passed
142         @burst = 0
143         debug "in spool, resetting @burst"
144       elsif (@burst >= @sendq_burst)
145         # nope. can't send anything, come back to us next tick...
146         @timer.start
147         return
148       end
149       @qmutex.synchronize do
150         debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
151         (@sendq_burst - @burst).times do
152           break if @sendq.empty?
153           puts_critical(@sendq.shift)
154         end
155       end
156       if @sendq.empty?
157         @timer.stop
158       end
159     end
160
161     def clearq
162       unless @sendq.empty?
163         @qmutex.synchronize do
164           @sendq.clear
165         end
166       end
167     end
168
169     # flush the TCPSocket
170     def flush
171       @sock.flush
172     end
173
174     # Wraps Kernel.select on the socket
175     def select(timeout=nil)
176       Kernel.select([@sock], nil, nil, timeout)
177     end
178
179     # shutdown the connection to the server
180     def shutdown(how=2)
181       @sock.shutdown(how) unless @sock.nil?
182       @sock = nil
183     end
184
185     private
186     
187     # same as puts, but expects to be called with a mutex held on @qmutex
188     def puts_critical(message)
189       # debug "in puts_critical"
190       debug "SEND: #{message.inspect}"
191       @sock.send(message + "\n",0)
192       @last_send = Time.new
193       @lines_sent += 1
194       @burst += 1
195     end
196
197   end
198
199 end