]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal.rb
cc0578de53a7940e7e4c4fda57447fcaf5d70e2a
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6
7 require 'thread'
8 require 'securerandom'
9
10 module Irc
11 class Bot
12 module Journal
13
14 =begin rdoc
15
16   The journal is a persistent message queue for rbot, its based on a basic
17   publish/subscribe model and persists messages into backend databases
18   that can be efficiently searched for past messages.
19
20   It is a addition to the key value storage already present in rbot
21   through its registry subsystem.
22
23 =end
24
25   Config.register Config::StringValue.new('journal.storage',
26     :default => nil,
27     :requires_restart => true,
28     :desc => 'storage engine used by the journal')
29   Config.register Config::StringValue.new('journal.storage.uri',
30     :default => nil,
31     :requires_restart => true,
32     :desc => 'storage database uri')
33
34   class InvalidJournalMessage < StandardError
35   end
36   class ConsumeInterrupt < StandardError
37   end
38   class StorageError < StandardError
39   end
40
41   class JournalMessage
42     # a unique identification of this message
43     attr_reader :id
44
45     # describes a hierarchical queue into which this message belongs
46     attr_reader :topic
47
48     # when this message was published as a Time instance
49     attr_reader :timestamp
50
51     # contains the actual message as a Hash
52     attr_reader :payload
53
54     def initialize(message)
55       @id = message[:id]
56       @timestamp = message[:timestamp]
57       @topic = message[:topic]
58       @payload = message[:payload]
59       if @payload.class != Hash
60         raise InvalidJournalMessage.new('payload must be a hash!')
61       end
62     end
63
64     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
65       value = pkey.split('.').reduce(@payload) do |hash, key|
66         if hash.has_key?(key) or hash.has_key?(key.to_sym)
67           hash[key] || hash[key.to_sym]
68         else
69           if default == :exception
70             raise ArgumentError.new
71           else
72             default
73           end
74         end
75       end
76     end
77
78     def ==(other)
79       @id == other.id
80     end
81
82     def self.create(topic, payload, opt={})
83       JournalMessage.new(
84         id: opt[:id] || SecureRandom.uuid,
85         timestamp: opt[:timestamp] || Time.now,
86         topic: topic,
87         payload: payload
88       )
89     end
90   end
91
92   module Storage
93     class AbstractStorage
94       # intializes/opens a new storage connection
95       def initialize(opts={})
96       end
97
98       # inserts a message in storage
99       def insert(message)
100       end
101
102       # creates/ensures a index exists on the payload specified by key
103       def ensure_index(key)
104       end
105
106       # returns a array of message instances that match the query
107       def find(query, limit=10, offset=0)
108       end
109
110       # returns the number of messages that match the query
111       def count(query)
112       end
113
114       # delete messages that match the query
115       def delete(query)
116       end
117     end
118   end
119
120   # Describes a query on journal entries, it is used both to describe
121   # a subscription aswell as to query persisted messages.
122   # There two ways to declare a Query instance, using
123   # the DSL like this:
124   #
125   #   Query.define do
126   #     id 'foo'
127   #     id 'bar'
128   #     topic 'log.irc.*'
129   #     topic 'log.core'
130   #     timestamp from: Time.now, to: Time.now + 60 * 10
131   #     payload 'action': :privmsg
132   #     payload 'channel': '#rbot'
133   #     payload 'foo.bar': 'baz'
134   #   end
135   #
136   # or using a hash: (NOTE: avoid using symbols in payload)
137   #
138   #   Query.define({
139   #     id: ['foo', 'bar'],
140   #     topic: ['log.irc.*', 'log.core'],
141   #     timestamp: {
142   #       from: Time.now
143   #       to: Time.now + 60 * 10
144   #     },
145   #     payload: {
146   #       'action' => 'privmsg'
147   #       'channel' => '#rbot',
148   #       'foo.bar' => 'baz'
149   #     }
150   #   })
151   #
152   class Query
153     # array of ids to match (OR)
154     attr_reader :id
155     # array of topics to match with wildcard support (OR)
156     attr_reader :topic
157     # hash with from: timestamp and to: timestamp
158     attr_reader :timestamp
159     # hash of key values to match
160     attr_reader :payload
161
162     def initialize(query)
163       @id = query[:id]
164       @topic = query[:topic]
165       @timestamp = query[:timestamp]
166       @payload = query[:payload]
167     end
168
169     # returns true if the given message matches the query
170     def matches?(message)
171       return false if not @id.empty? and not @id.include? message.id
172       return false if not @topic.empty? and not topic_matches? message.topic
173       if @timestamp[:from]
174         return false unless message.timestamp >= @timestamp[:from]
175       end
176       if @timestamp[:to]
177         return false unless message.timestamp <= @timestamp[:to]
178       end
179       found = false
180       @payload.each_pair do |key, value|
181         begin
182           message.get(key.to_s)
183         rescue ArgumentError
184         end
185         found = true
186       end
187       return false if not found and not @payload.empty?
188       true
189     end
190
191     def topic_matches?(_topic)
192       @topic.each do |topic|
193         if topic.include? '*'
194           match = true
195           topic.split('.').zip(_topic.split('.')).each do |a, b|
196             if a == '*'
197               if not b or b.empty?
198                 match = false
199               end
200             else
201               match = false unless a == b
202             end
203           end
204           return true if match
205         else
206           return true if topic == _topic
207         end
208       end
209       return false
210     end
211
212     # factory that constructs a query
213     class Factory
214       attr_reader :query
215       def initialize
216         @query = {
217           id: [],
218           topic: [],
219           timestamp: {
220             from: nil, to: nil
221           },
222           payload: {}
223         }
224       end
225
226       def id(*_id)
227         @query[:id] += _id
228       end
229
230       def topic(*_topic)
231           @query[:topic] += _topic
232       end
233
234       def timestamp(range)
235         @query[:timestamp] = range
236       end
237
238       def payload(query)
239         @query[:payload].merge!(query)
240       end
241     end
242
243     def self.define(query=nil, &block)
244       factory = Factory.new
245       if block_given?
246         factory.instance_eval(&block)
247         query = factory.query
248       end
249       Query.new query
250     end
251
252   end
253
254
255   class JournalBroker
256     class Subscription
257       attr_reader :query
258       attr_reader :block
259       def initialize(broker, query, block)
260         @broker = broker
261         @query = query
262         @block = block
263       end
264       def cancel
265         @broker.unsubscribe(self)
266       end
267     end
268
269     def initialize(opts={})
270       # overrides the internal consumer with a block
271       @consumer = opts[:consumer]
272       # storage backend
273       @storage = opts[:storage]
274       @queue = Queue.new
275       # consumer thread:
276       @thread = Thread.new do
277         loop do
278           begin
279             consume @queue.pop
280           # pop(true) ... rescue ThreadError => e
281           rescue ConsumeInterrupt => e
282             error 'journal broker: stop thread, consume interrupt raised'
283             break
284           rescue Exception => e
285             error 'journal broker: exception in consumer thread'
286             error $!
287           end
288         end
289       end
290       # TODO: this is a first naive implementation, later we do the
291       #       message/query matching for incoming messages more efficiently
292       @subscriptions = []
293     end
294
295     def consume(message)
296       return unless message
297       @consumer.call(message) if @consumer
298
299       # notify subscribers
300       @subscriptions.each do |s|
301         if s.query.matches? message
302           s.block.call(message)
303         end
304       end
305
306       @storage.insert(message) if @storage
307     end
308
309     def persists?
310       true if @storage
311     end
312
313     def join
314       @thread.join
315     end
316
317     def shutdown
318       @thread.raise ConsumeInterrupt.new
319     end
320
321     def publish(topic, payload)
322       @queue.push JournalMessage::create(topic, payload)
323     end
324
325     # subscribe to messages that match the given query
326     def subscribe(query, &block)
327       raise ArgumentError.new unless block_given?
328       s = Subscription.new(self, query, block)
329       @subscriptions << s
330       s
331     end
332
333     def unsubscribe(subscription)
334       @subscriptions.delete subscription
335     end
336
337   end
338
339 end # Journal
340 end # Bot
341 end # Irc
342