]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal.rb
journal: more postgres tests
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6
7 require 'thread'
8 require 'securerandom'
9
10 module Irc
11 class Bot
12 module Journal
13
14 =begin rdoc
15
16   The journal is a persistent message queue for rbot, its based on a basic
17   publish/subscribe model and persists messages into backend databases
18   that can be efficiently searched for past messages.
19
20   It is a addition to the key value storage already present in rbot
21   through its registry subsystem.
22
23 =end
24
25   Config.register Config::StringValue.new('journal.storage',
26     :default => nil,
27     :requires_restart => true,
28     :desc => 'storage engine used by the journal')
29   Config.register Config::StringValue.new('journal.storage.uri',
30     :default => nil,
31     :requires_restart => true,
32     :desc => 'storage database uri')
33
34   class InvalidJournalMessage < StandardError
35   end
36   class ConsumeInterrupt < StandardError
37   end
38   class StorageError < StandardError
39   end
40
41   class JournalMessage
42     # a unique identification of this message
43     attr_reader :id
44
45     # describes a hierarchical queue into which this message belongs
46     attr_reader :topic
47
48     # when this message was published as a Time instance
49     attr_reader :timestamp
50
51     # contains the actual message as a Hash
52     attr_reader :payload
53
54     def initialize(message)
55       @id = message[:id]
56       @timestamp = message[:timestamp]
57       @topic = message[:topic]
58       @payload = message[:payload]
59       if @payload.class != Hash
60         raise InvalidJournalMessage.new('payload must be a hash!')
61       end
62     end
63
64     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
65       value = pkey.split('.').reduce(@payload) do |hash, key|
66         if hash.has_key?(key) or hash.has_key?(key.to_sym)
67           hash[key] || hash[key.to_sym]
68         else
69           if default == :exception
70             raise ArgumentError.new
71           else
72             default
73           end
74         end
75       end
76     end
77
78     def ==(other)
79       (@id == other.id) rescue false
80     end
81
82     def self.create(topic, payload, opt={})
83       JournalMessage.new(
84         id: opt[:id] || SecureRandom.uuid,
85         timestamp: opt[:timestamp] || Time.now,
86         topic: topic,
87         payload: payload
88       )
89     end
90   end
91
92   module Storage
93     class AbstractStorage
94       # intializes/opens a new storage connection
95       def initialize(opts={})
96       end
97
98       # inserts a message in storage
99       def insert(message)
100       end
101
102       # creates/ensures a index exists on the payload specified by key
103       def ensure_index(key)
104       end
105
106       # returns a array of message instances that match the query
107       def find(query=nil, limit=100, offset=0)
108       end
109
110       # returns the number of messages that match the query
111       def count(query=nil)
112       end
113
114       # remove messages that match the query
115       def remove(query=nil)
116       end
117
118       # destroy the underlying table/collection
119       def drop
120       end
121
122       # Returns all classes from the namespace that implement this interface
123       def self.get_impl
124         ObjectSpace.each_object(Class).select { |klass| klass < self }
125       end
126     end
127
128     def create(name, uri)
129       log 'load journal storage adapter: ' + name
130       load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
131       cls = AbstractStorage.get_impl.first
132       cls.new(uri: uri)
133     end
134   end
135
136   # Describes a query on journal entries, it is used both to describe
137   # a subscription aswell as to query persisted messages.
138   # There two ways to declare a Query instance, using
139   # the DSL like this:
140   #
141   #   Query.define do
142   #     id 'foo'
143   #     id 'bar'
144   #     topic 'log.irc.*'
145   #     topic 'log.core'
146   #     timestamp from: Time.now, to: Time.now + 60 * 10
147   #     payload 'action': :privmsg
148   #     payload 'channel': '#rbot'
149   #     payload 'foo.bar': 'baz'
150   #   end
151   #
152   # or using a hash: (NOTE: avoid using symbols in payload)
153   #
154   #   Query.define({
155   #     id: ['foo', 'bar'],
156   #     topic: ['log.irc.*', 'log.core'],
157   #     timestamp: {
158   #       from: Time.now
159   #       to: Time.now + 60 * 10
160   #     },
161   #     payload: {
162   #       'action' => 'privmsg'
163   #       'channel' => '#rbot',
164   #       'foo.bar' => 'baz'
165   #     }
166   #   })
167   #
168   class Query
169     # array of ids to match (OR)
170     attr_reader :id
171     # array of topics to match with wildcard support (OR)
172     attr_reader :topic
173     # hash with from: timestamp and to: timestamp
174     attr_reader :timestamp
175     # hash of key values to match
176     attr_reader :payload
177
178     def initialize(query)
179       @id = query[:id]
180       @topic = query[:topic]
181       @timestamp = query[:timestamp]
182       @payload = query[:payload]
183     end
184
185     # returns true if the given message matches the query
186     def matches?(message)
187       return false if not @id.empty? and not @id.include? message.id
188       return false if not @topic.empty? and not topic_matches? message.topic
189       if @timestamp[:from]
190         return false unless message.timestamp >= @timestamp[:from]
191       end
192       if @timestamp[:to]
193         return false unless message.timestamp <= @timestamp[:to]
194       end
195       found = false
196       @payload.each_pair do |key, value|
197         begin
198           message.get(key.to_s)
199         rescue ArgumentError
200         end
201         found = true
202       end
203       return false if not found and not @payload.empty?
204       true
205     end
206
207     def topic_matches?(_topic)
208       @topic.each do |topic|
209         if topic.include? '*'
210           match = true
211           topic.split('.').zip(_topic.split('.')).each do |a, b|
212             if a == '*'
213               if not b or b.empty?
214                 match = false
215               end
216             else
217               match = false unless a == b
218             end
219           end
220           return true if match
221         else
222           return true if topic == _topic
223         end
224       end
225       return false
226     end
227
228     # factory that constructs a query
229     class Factory
230       attr_reader :query
231       def initialize
232         @query = {
233           id: [],
234           topic: [],
235           timestamp: {
236             from: nil, to: nil
237           },
238           payload: {}
239         }
240       end
241
242       def id(*_id)
243         @query[:id] += _id
244       end
245
246       def topic(*_topic)
247           @query[:topic] += _topic
248       end
249
250       def timestamp(range)
251         @query[:timestamp] = range
252       end
253
254       def payload(query)
255         @query[:payload].merge!(query)
256       end
257     end
258
259     def self.define(query=nil, &block)
260       factory = Factory.new
261       if block_given?
262         factory.instance_eval(&block)
263         query = factory.query
264       end
265       Query.new query
266     end
267
268   end
269
270
271   class JournalBroker
272     class Subscription
273       attr_reader :query
274       attr_reader :block
275       def initialize(broker, query, block)
276         @broker = broker
277         @query = query
278         @block = block
279       end
280       def cancel
281         @broker.unsubscribe(self)
282       end
283     end
284
285     def initialize(opts={})
286       # overrides the internal consumer with a block
287       @consumer = opts[:consumer]
288       # storage backend
289       if @bot
290         @storage = opts[:storage] || Storage.create(
291             @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
292       else
293         @storage = opts[:storage]
294       end
295       @queue = Queue.new
296       # consumer thread:
297       @thread = Thread.new do
298         loop do
299           begin
300             consume @queue.pop
301           # pop(true) ... rescue ThreadError => e
302           rescue ConsumeInterrupt => e
303             error 'journal broker: stop thread, consume interrupt raised'
304             break
305           rescue Exception => e
306             error 'journal broker: exception in consumer thread'
307             error $!
308           end
309         end
310       end
311       # TODO: this is a first naive implementation, later we do the
312       #       message/query matching for incoming messages more efficiently
313       @subscriptions = []
314     end
315
316     def consume(message)
317       return unless message
318       @consumer.call(message) if @consumer
319
320       # notify subscribers
321       @subscriptions.each do |s|
322         if s.query.matches? message
323           s.block.call(message)
324         end
325       end
326
327       @storage.insert(message) if @storage
328     end
329
330     def persists?
331       true if @storage
332     end
333
334     def join
335       @thread.join
336     end
337
338     def shutdown
339       @thread.raise ConsumeInterrupt.new
340     end
341
342     def publish(topic, payload)
343       @queue.push JournalMessage::create(topic, payload)
344     end
345
346     # subscribe to messages that match the given query
347     def subscribe(query, &block)
348       raise ArgumentError.new unless block_given?
349       s = Subscription.new(self, query, block)
350       @subscriptions << s
351       s
352     end
353
354     def unsubscribe(subscription)
355       @subscriptions.delete subscription
356     end
357
358     def find(query, limit=100, offset=0, &block)
359       if block_given?
360         begin
361           res = @storage.find(query, limit, offset)
362           block.call(res)
363         end until res.length > 0
364       else
365         @storage.find(query, limit, offset)
366       end
367     end
368
369   end
370
371 end # Journal
372 end # Bot
373 end # Irc
374