]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal.rb
introducing a persistent message queue, the "journal"
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6
7 require 'thread'
8 require 'securerandom'
9
10 module Irc
11 class Bot
12 module Journal
13
14 =begin rdoc
15
16   The journal is a persistent message queue for rbot, its based on a basic
17   publish/subscribe model and persists messages into backend databases
18   that can be efficiently searched for past messages.
19
20   It is a addition to the key value storage already present in rbot
21   through its registry subsystem.
22
23 =end
24
25   class InvalidJournalMessage < StandardError
26   end
27   class ConsumeInterrupt < StandardError
28   end
29
30   class JournalMessage
31     # a unique identification of this message
32     attr_reader :id
33
34     # describes a hierarchical queue into which this message belongs
35     attr_reader :topic
36
37     # when this message was published as a Time instance
38     attr_reader :timestamp
39
40     # contains the actual message as a Hash
41     attr_reader :payload
42
43     def initialize(message)
44       @id = message[:id]
45       @timestamp = message[:timestamp]
46       @topic = message[:topic]
47       @payload = message[:payload]
48       if @payload.class != Hash
49         raise InvalidJournalMessage.new('payload must be a hash!')
50       end
51     end
52
53     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
54       value = pkey.split('.').reduce(@payload) do |hash, key|
55         if hash.has_key?(key) or hash.has_key?(key.to_sym)
56           hash[key] || hash[key.to_sym]
57         else
58           if default == :exception
59             raise ArgumentError.new
60           else
61             default
62           end
63         end
64       end
65     end
66
67     def self.create(topic, payload)
68       JournalMessage.new(
69         id: SecureRandom.uuid,
70         timestamp: Time.now,
71         topic: topic,
72         payload: payload
73       )
74     end
75   end
76
77   # Describes a query on journal entries, it is used both to describe
78   # a subscription aswell as to query persisted messages.
79   # There two ways to declare a Query instance, using
80   # the DSL like this:
81   #
82   #   Query.define do
83   #     id 'foo'
84   #     id 'bar'
85   #     topic 'log.irc.*'
86   #     topic 'log.core'
87   #     timestamp from: Time.now, to: Time.now + 60 * 10
88   #     payload 'action': :privmsg
89   #     payload 'channel': '#rbot'
90   #     payload 'foo.bar': 'baz'
91   #   end
92   #
93   # or using a hash: (NOTE: avoid using symbols in payload)
94   #
95   #   Query.define({
96   #     id: ['foo', 'bar'],
97   #     topic: ['log.irc.*', 'log.core'],
98   #     timestamp: {
99   #       from: Time.now
100   #       to: Time.now + 60 * 10
101   #     },
102   #     payload: {
103   #       'action' => 'privmsg'
104   #       'channel' => '#rbot',
105   #       'foo.bar' => 'baz'
106   #     }
107   #   })
108   #
109   class Query
110     # array of ids to match (OR)
111     attr_reader :id
112     # array of topics to match with wildcard support (OR)
113     attr_reader :topic
114     # hash with from: timestamp and to: timestamp
115     attr_reader :timestamp
116     # hash of key values to match
117     attr_reader :payload
118
119     def initialize(query)
120       @id = query[:id]
121       @topic = query[:topic]
122       @timestamp = query[:timestamp]
123       @payload = query[:payload]
124     end
125
126     # returns true if the given message matches the query
127     def matches?(message)
128       return false if not @id.empty? and not @id.include? message.id
129       return false if not @topic.empty? and not topic_matches? message.topic
130       if @timestamp[:from]
131         return false unless message.timestamp >= @timestamp[:from]
132       end
133       if @timestamp[:to]
134         return false unless message.timestamp <= @timestamp[:to]
135       end
136       found = false
137       @payload.each_pair do |key, value|
138         begin
139           message.get(key.to_s)
140         rescue ArgumentError
141         end
142         found = true
143       end
144       return false if not found and not @payload.empty?
145       true
146     end
147
148     def topic_matches?(_topic)
149       @topic.each do |topic|
150         if topic.include? '*'
151           match = true
152           topic.split('.').zip(_topic.split('.')).each do |a, b|
153             if a == '*'
154               if not b or b.empty?
155                 match = false
156               end
157             else
158               match = false unless a == b
159             end
160           end
161           return true if match
162         else
163           return true if topic == _topic
164         end
165       end
166       return false
167     end
168
169     # factory that constructs a query
170     class Factory
171       attr_reader :query
172       def initialize
173         @query = {
174           id: [],
175           topic: [],
176           timestamp: {
177             from: nil, to: nil
178           },
179           payload: {}
180         }
181       end
182
183       def id(*_id)
184         @query[:id] += _id
185       end
186
187       def topic(*_topic)
188           @query[:topic] += _topic
189       end
190
191       def timestamp(range)
192         @query[:timestamp] = range
193       end
194
195       def payload(query)
196         @query[:payload].merge!(query)
197       end
198     end
199
200     def self.define(query=nil, &block)
201       factory = Factory.new
202       if block_given?
203         factory.instance_eval(&block)
204         query = factory.query
205       end
206       Query.new query
207     end
208
209   end
210
211
212
213   class JournalBroker
214     def initialize(opts={})
215       # overrides the internal consumer with a block
216       @consumer = opts[:consumer]
217       @queue = Queue.new
218       # consumer thread:
219       @thread = Thread.new do
220         loop do
221           begin
222             consume @queue.pop
223           # pop(true) ... rescue ThreadError => e
224           rescue ConsumeInterrupt => e
225             error 'journal broker: stop thread, consume interrupt raised'
226             break
227           rescue Exception => e
228             error 'journal broker: exception in consumer thread'
229             error $!
230           end
231         end
232       end
233       # TODO: this is a first naive implementation, later we do the
234       #       message/query matching for incoming messages more efficiently
235       @subscriptions = []
236     end
237
238     def consume(message)
239       return unless message
240       @consumer.call(message) if @consumer
241
242       # notify subscribers
243       @subscriptions.each do |query, block|
244         if query.matches? message
245           block.call(message)
246         end
247       end
248     end
249
250     def join
251       @thread.join
252     end
253
254     def shutdown
255       @thread.raise ConsumeInterrupt.new
256     end
257
258
259     def publish(topic, payload)
260       @queue.push JournalMessage::create(topic, payload)
261     end
262
263     # subscribe to messages that match the given query
264     def subscribe(query, &block)
265       raise ArgumentError.new unless block_given?
266       @subscriptions << [query, block]
267     end
268
269   end
270
271 end # Journal
272 end # Bot
273 end # Irc
274