]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal.rb
09ee7369a5a0acd485796982359fd32a87c1e0d4
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6
7 require 'thread'
8 require 'securerandom'
9
10 module Irc
11 class Bot
12 module Journal
13
14 =begin rdoc
15
16   The journal is a persistent message queue for rbot, its based on a basic
17   publish/subscribe model and persists messages into backend databases
18   that can be efficiently searched for past messages.
19
20   It is a addition to the key value storage already present in rbot
21   through its registry subsystem.
22
23 =end
24
25   class InvalidJournalMessage < StandardError
26   end
27   class ConsumeInterrupt < StandardError
28   end
29
30   class JournalMessage
31     # a unique identification of this message
32     attr_reader :id
33
34     # describes a hierarchical queue into which this message belongs
35     attr_reader :topic
36
37     # when this message was published as a Time instance
38     attr_reader :timestamp
39
40     # contains the actual message as a Hash
41     attr_reader :payload
42
43     def initialize(message)
44       @id = message[:id]
45       @timestamp = message[:timestamp]
46       @topic = message[:topic]
47       @payload = message[:payload]
48       if @payload.class != Hash
49         raise InvalidJournalMessage.new('payload must be a hash!')
50       end
51     end
52
53     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
54       value = pkey.split('.').reduce(@payload) do |hash, key|
55         if hash.has_key?(key) or hash.has_key?(key.to_sym)
56           hash[key] || hash[key.to_sym]
57         else
58           if default == :exception
59             raise ArgumentError.new
60           else
61             default
62           end
63         end
64       end
65     end
66
67     def self.create(topic, payload)
68       JournalMessage.new(
69         id: SecureRandom.uuid,
70         timestamp: Time.now,
71         topic: topic,
72         payload: payload
73       )
74     end
75   end
76
77   # Describes a query on journal entries, it is used both to describe
78   # a subscription aswell as to query persisted messages.
79   # There two ways to declare a Query instance, using
80   # the DSL like this:
81   #
82   #   Query.define do
83   #     id 'foo'
84   #     id 'bar'
85   #     topic 'log.irc.*'
86   #     topic 'log.core'
87   #     timestamp from: Time.now, to: Time.now + 60 * 10
88   #     payload 'action': :privmsg
89   #     payload 'channel': '#rbot'
90   #     payload 'foo.bar': 'baz'
91   #   end
92   #
93   # or using a hash: (NOTE: avoid using symbols in payload)
94   #
95   #   Query.define({
96   #     id: ['foo', 'bar'],
97   #     topic: ['log.irc.*', 'log.core'],
98   #     timestamp: {
99   #       from: Time.now
100   #       to: Time.now + 60 * 10
101   #     },
102   #     payload: {
103   #       'action' => 'privmsg'
104   #       'channel' => '#rbot',
105   #       'foo.bar' => 'baz'
106   #     }
107   #   })
108   #
109   class Query
110     # array of ids to match (OR)
111     attr_reader :id
112     # array of topics to match with wildcard support (OR)
113     attr_reader :topic
114     # hash with from: timestamp and to: timestamp
115     attr_reader :timestamp
116     # hash of key values to match
117     attr_reader :payload
118
119     def initialize(query)
120       @id = query[:id]
121       @topic = query[:topic]
122       @timestamp = query[:timestamp]
123       @payload = query[:payload]
124     end
125
126     # returns true if the given message matches the query
127     def matches?(message)
128       return false if not @id.empty? and not @id.include? message.id
129       return false if not @topic.empty? and not topic_matches? message.topic
130       if @timestamp[:from]
131         return false unless message.timestamp >= @timestamp[:from]
132       end
133       if @timestamp[:to]
134         return false unless message.timestamp <= @timestamp[:to]
135       end
136       found = false
137       @payload.each_pair do |key, value|
138         begin
139           message.get(key.to_s)
140         rescue ArgumentError
141         end
142         found = true
143       end
144       return false if not found and not @payload.empty?
145       true
146     end
147
148     def topic_matches?(_topic)
149       @topic.each do |topic|
150         if topic.include? '*'
151           match = true
152           topic.split('.').zip(_topic.split('.')).each do |a, b|
153             if a == '*'
154               if not b or b.empty?
155                 match = false
156               end
157             else
158               match = false unless a == b
159             end
160           end
161           return true if match
162         else
163           return true if topic == _topic
164         end
165       end
166       return false
167     end
168
169     # factory that constructs a query
170     class Factory
171       attr_reader :query
172       def initialize
173         @query = {
174           id: [],
175           topic: [],
176           timestamp: {
177             from: nil, to: nil
178           },
179           payload: {}
180         }
181       end
182
183       def id(*_id)
184         @query[:id] += _id
185       end
186
187       def topic(*_topic)
188           @query[:topic] += _topic
189       end
190
191       def timestamp(range)
192         @query[:timestamp] = range
193       end
194
195       def payload(query)
196         @query[:payload].merge!(query)
197       end
198     end
199
200     def self.define(query=nil, &block)
201       factory = Factory.new
202       if block_given?
203         factory.instance_eval(&block)
204         query = factory.query
205       end
206       Query.new query
207     end
208
209   end
210
211
212   class JournalBroker
213     class Subscription
214       attr_reader :query
215       attr_reader :block
216       def initialize(broker, query, block)
217         @broker = broker
218         @query = query
219         @block = block
220       end
221       def cancel
222         @broker.unsubscribe(self)
223       end
224     end
225
226     def initialize(opts={})
227       # overrides the internal consumer with a block
228       @consumer = opts[:consumer]
229       @queue = Queue.new
230       # consumer thread:
231       @thread = Thread.new do
232         loop do
233           begin
234             consume @queue.pop
235           # pop(true) ... rescue ThreadError => e
236           rescue ConsumeInterrupt => e
237             error 'journal broker: stop thread, consume interrupt raised'
238             break
239           rescue Exception => e
240             error 'journal broker: exception in consumer thread'
241             error $!
242           end
243         end
244       end
245       # TODO: this is a first naive implementation, later we do the
246       #       message/query matching for incoming messages more efficiently
247       @subscriptions = []
248     end
249
250     def consume(message)
251       return unless message
252       @consumer.call(message) if @consumer
253
254       # notify subscribers
255       @subscriptions.each do |s|
256         if s.query.matches? message
257           s.block.call(message)
258         end
259       end
260     end
261
262     def join
263       @thread.join
264     end
265
266     def shutdown
267       @thread.raise ConsumeInterrupt.new
268     end
269
270     def publish(topic, payload)
271       @queue.push JournalMessage::create(topic, payload)
272     end
273
274     # subscribe to messages that match the given query
275     def subscribe(query, &block)
276       raise ArgumentError.new unless block_given?
277       s = Subscription.new(self, query, block)
278       @subscriptions << s
279       s
280     end
281
282     def unsubscribe(subscription)
283       @subscriptions.delete subscription
284     end
285
286   end
287
288 end # Journal
289 end # Bot
290 end # Irc
291