]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/ircsocket.rb
Prevent a traceback when network is down and clearq is called. Don't know if it's...
[user/henk/code/ruby/rbot.git] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   # wrapped TCPSocket for communication with the server.
8   # emulates a subset of TCPSocket functionality
9   class IrcSocket
10     # total number of lines sent to the irc server
11     attr_reader :lines_sent
12
13     # total number of lines received from the irc server
14     attr_reader :lines_received
15
16     # delay between lines sent
17     attr_reader :sendq_delay
18
19     # max lines to burst
20     attr_reader :sendq_burst
21
22     # server:: server to connect to
23     # port::   IRCd port
24     # host::   optional local host to bind to (ruby 1.7+ required)
25     # create a new IrcSocket
26     def initialize(server, port, host, sendq_delay=2, sendq_burst=4)
27       @timer = Timer::Timer.new
28       @timer.add(0.2) do
29         spool
30       end
31       @server = server.dup
32       @port = port.to_i
33       @host = host
34       @sock = nil
35       @spooler = false
36       @lines_sent = 0
37       @lines_received = 0
38       if sendq_delay
39         @sendq_delay = sendq_delay.to_f
40       else
41         @sendq_delay = 2
42       end
43       @last_send = Time.new - @sendq_delay
44       @burst = 0
45       if sendq_burst
46         @sendq_burst = sendq_burst.to_i
47       else
48         @sendq_burst = 4
49       end
50     end
51
52     def connected?
53       !@sock.nil?
54     end
55
56     # open a TCP connection to the server
57     def connect
58       if connected?
59         debug "reconnecting socket while connected"
60         shutdown
61       end
62       if(@host)
63         begin
64           @sock=TCPSocket.new(@server, @port, @host)
65         rescue ArgumentError => e
66           $stderr.puts "Your version of ruby does not support binding to a "
67           $stderr.puts "specific local address, please upgrade if you wish "
68           $stderr.puts "to use HOST = foo"
69           $stderr.puts "(this option has been disabled in order to continue)"
70           @sock=TCPSocket.new(@server, @port)
71         end
72       else
73         @sock=TCPSocket.new(@server, @port)
74       end
75       @qthread = false
76       @qmutex = Mutex.new
77       @sendq = Array.new
78     end
79
80     def sendq_delay=(newfreq)
81       debug "changing sendq frequency to #{newfreq}"
82       @qmutex.synchronize do
83         @sendq_delay = newfreq
84         if newfreq == 0
85           clearq
86           @timer.stop
87         else
88           @timer.start
89         end
90       end
91     end
92
93     def sendq_burst=(newburst)
94       @qmutex.synchronize do
95         @sendq_burst = newburst
96       end
97     end
98
99     # used to send lines to the remote IRCd
100     # message: IRC message to send
101     def puts(message)
102       @qmutex.synchronize do
103         # debug "In puts - got mutex"
104         puts_critical(message)
105       end
106     end
107
108     # get the next line from the server (blocks)
109     def gets
110       if @sock.nil?
111         debug "socket get attempted while closed"
112         return nil
113       end
114       begin
115         reply = @sock.gets
116         @lines_received += 1
117         reply.strip! if reply
118         debug "RECV: #{reply.inspect}"
119         return reply
120       rescue => e
121         debug "socket get failed: #{e.inspect}"
122         return nil
123       end
124     end
125
126     def queue(msg)
127       if @sendq_delay > 0
128         @qmutex.synchronize do
129           @sendq.push msg
130         end
131         @timer.start
132       else
133         # just send it if queueing is disabled
134         self.puts(msg)
135       end
136     end
137
138     # pop a message off the queue, send it
139     def spool
140       if @sendq.empty?
141         @timer.stop
142         return
143       end
144       now = Time.new
145       if (now >= (@last_send + @sendq_delay))
146         # reset burst counter after @sendq_delay has passed
147         @burst = 0
148         debug "in spool, resetting @burst"
149       elsif (@burst >= @sendq_burst)
150         # nope. can't send anything, come back to us next tick...
151         @timer.start
152         return
153       end
154       @qmutex.synchronize do
155         debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
156         (@sendq_burst - @burst).times do
157           break if @sendq.empty?
158           puts_critical(@sendq.shift)
159         end
160       end
161       if @sendq.empty?
162         @timer.stop
163       end
164     end
165
166     def clearq
167       if @sock
168         unless @sendq.empty?
169           @qmutex.synchronize do
170             @sendq.clear
171           end
172         end
173       end
174     end
175
176     # flush the TCPSocket
177     def flush
178       @sock.flush
179     end
180
181     # Wraps Kernel.select on the socket
182     def select(timeout=nil)
183       Kernel.select([@sock], nil, nil, timeout)
184     end
185
186     # shutdown the connection to the server
187     def shutdown(how=2)
188       @sock.shutdown(how) unless @sock.nil?
189       @sock = nil
190     end
191
192     private
193
194     # same as puts, but expects to be called with a mutex held on @qmutex
195     def puts_critical(message)
196       # debug "in puts_critical"
197       debug "SEND: #{message.inspect}"
198       @sock.send(message + "\n",0)
199       @last_send = Time.new
200       @lines_sent += 1
201       @burst += 1
202     end
203
204   end
205
206 end