1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
module Irc
require 'socket'
require 'thread'
# wrapped TCPSocket for communication with the server.
# emulates a subset of TCPSocket functionality
class IrcSocket
# total number of lines sent to the irc server
attr_reader :lines_sent
# total number of lines received from the irc server
attr_reader :lines_received
# delay between lines sent
attr_reader :sendq_delay
# max lines to burst
attr_reader :sendq_burst
# server:: server to connect to
# port:: IRCd port
# host:: optional local host to bind to (ruby 1.7+ required)
# create a new IrcSocket
def initialize(server, port, host, sendq_delay=2, sendq_burst=4)
@server = server.dup
@port = port.to_i
@host = host
@lines_sent = 0
@lines_received = 0
if sendq_delay
@sendq_delay = sendq_delay.to_f
else
@sendq_delay = 2
end
@last_send = Time.new - @sendq_delay
@burst = 0
if sendq_burst
@sendq_burst = sendq_burst.to_i
else
@sendq_burst = 4
end
end
# open a TCP connection to the server
def connect
if(@host)
begin
@sock=TCPSocket.new(@server, @port, @host)
rescue ArgumentError => e
$stderr.puts "Your version of ruby does not support binding to a "
$stderr.puts "specific local address, please upgrade if you wish "
$stderr.puts "to use HOST = foo"
$stderr.puts "(this option has been disabled in order to continue)"
@sock=TCPSocket.new(@server, @port)
end
else
@sock=TCPSocket.new(@server, @port)
end
@qthread = false
@qmutex = Mutex.new
@sendq = Array.new
if (@sendq_delay > 0)
@qthread = Thread.new { spooler }
end
end
def sendq_delay=(newfreq)
debug "changing sendq frequency to #{newfreq}"
@qmutex.synchronize do
@sendq_delay = newfreq
if newfreq == 0 && @qthread
clearq
Thread.kill(@qthread)
@qthread = false
elsif(newfreq != 0 && !@qthread)
@qthread = Thread.new { spooler }
end
end
end
def sendq_burst=(newburst)
@qmutex.synchronize do
@sendq_burst = newburst
end
end
# used to send lines to the remote IRCd
# message: IRC message to send
def puts(message)
@qmutex.synchronize do
# debug "In puts - got mutex"
puts_critical(message)
end
end
# get the next line from the server (blocks)
def gets
reply = @sock.gets
@lines_received += 1
if(reply)
reply.strip!
end
debug "RECV: #{reply.inspect}"
reply
end
def queue(msg)
if @sendq_delay > 0
@qmutex.synchronize do
# debug "QUEUEING: #{msg}"
@sendq.push msg
end
else
# just send it if queueing is disabled
self.puts(msg)
end
end
def spooler
while true
spool
sleep 0.2
end
end
# pop a message off the queue, send it
def spool
unless @sendq.empty?
now = Time.new
if (now >= (@last_send + @sendq_delay))
# reset burst counter after @sendq_delay has passed
@burst = 0
debug "in spool, resetting @burst"
elsif (@burst >= @sendq_burst)
# nope. can't send anything
return
end
@qmutex.synchronize do
debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
(@sendq_burst - @burst).times do
break if @sendq.empty?
puts_critical(@sendq.shift)
end
end
end
end
def clearq
unless @sendq.empty?
@qmutex.synchronize do
@sendq.clear
end
end
end
# flush the TCPSocket
def flush
@sock.flush
end
# Wraps Kernel.select on the socket
def select(timeout)
Kernel.select([@sock], nil, nil, timeout)
end
# shutdown the connection to the server
def shutdown(how=2)
@sock.shutdown(how)
end
private
# same as puts, but expects to be called with a mutex held on @qmutex
def puts_critical(message)
# debug "in puts_critical"
debug "SEND: #{message.inspect}"
@sock.send(message + "\n",0)
@last_send = Time.new
@lines_sent += 1
@burst += 1
end
end
end
|