]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal.rb
journal: wrap postgres client in single thread
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6 #
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
8
9 require 'thread'
10 require 'securerandom'
11
12 module Irc
13 class Bot
14 module Journal
15
16 =begin rdoc
17
18   The journal is a persistent message queue for rbot, its based on a basic
19   publish/subscribe model and persists messages into backend databases
20   that can be efficiently searched for past messages.
21
22   It is a addition to the key value storage already present in rbot
23   through its registry subsystem.
24
25 =end
26
27   class InvalidJournalMessage < StandardError
28   end
29   class StorageError < StandardError
30   end
31
32   class JournalMessage
33     # a unique identification of this message
34     attr_reader :id
35
36     # describes a hierarchical queue into which this message belongs
37     attr_reader :topic
38
39     # when this message was published as a Time instance
40     attr_reader :timestamp
41
42     # contains the actual message as a Hash
43     attr_reader :payload
44
45     def initialize(message)
46       @id = message[:id]
47       @timestamp = message[:timestamp]
48       @topic = message[:topic]
49       @payload = message[:payload]
50       if @payload.class != Hash
51         raise InvalidJournalMessage.new('payload must be a hash!')
52       end
53     end
54
55     # Access payload value by key.
56     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57       value = pkey.split('.').reduce(@payload) do |hash, key|
58         if hash.has_key?(key) or hash.has_key?(key.to_sym)
59           hash[key] || hash[key.to_sym]
60         else
61           if default == :exception
62             raise ArgumentError.new
63           else
64             default
65           end
66         end
67       end
68     end
69
70     # Access payload value by key alias for get(key, nil).
71     def [](key)
72       get(key, nil)
73     end
74
75     def ==(other)
76       (@id == other.id) rescue false
77     end
78
79     def self.create(topic, payload, opt={})
80       # cleanup payload to only contain strings
81       payload = payload.map { |k, v| [k.to_s, v.to_s] }.to_h
82       JournalMessage.new(
83         id: opt[:id] || SecureRandom.uuid,
84         timestamp: opt[:timestamp] || Time.now,
85         topic: topic,
86         payload: payload
87       )
88     end
89   end
90
91   module Storage
92     class AbstractStorage
93       # intializes/opens a new storage connection
94       def initialize(opts={})
95       end
96
97       # inserts a message in storage
98       def insert(message)
99       end
100
101       # creates/ensures a index exists on the payload specified by key
102       def ensure_index(key)
103       end
104
105       # returns a array of message instances that match the query
106       def find(query=nil, limit=100, offset=0, &block)
107       end
108
109       # returns the number of messages that match the query
110       def count(query=nil)
111       end
112
113       # remove messages that match the query
114       def remove(query=nil)
115       end
116
117       # destroy the underlying table/collection
118       def drop
119       end
120
121       # Returns all classes from the namespace that implement this interface
122       def self.get_impl
123         ObjectSpace.each_object(Class).select { |klass| klass < self }
124       end
125     end
126
127     def self.create(name, uri)
128       log 'load journal storage adapter: ' + name
129       load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
130       cls = AbstractStorage.get_impl.first
131       cls.new(uri: uri)
132     end
133   end
134
135   # Describes a query on journal entries, it is used both to describe
136   # a subscription aswell as to query persisted messages.
137   # There two ways to declare a Query instance, using
138   # the DSL like this:
139   #
140   #   Query.define do
141   #     id 'foo'
142   #     id 'bar'
143   #     topic 'log.irc.*'
144   #     topic 'log.core'
145   #     timestamp from: Time.now, to: Time.now + 60 * 10
146   #     payload 'action': :privmsg
147   #     payload 'channel': '#rbot'
148   #     payload 'foo.bar': 'baz'
149   #   end
150   #
151   # or using a hash: (NOTE: avoid using symbols in payload)
152   #
153   #   Query.define({
154   #     id: ['foo', 'bar'],
155   #     topic: ['log.irc.*', 'log.core'],
156   #     timestamp: {
157   #       from: Time.now
158   #       to: Time.now + 60 * 10
159   #     },
160   #     payload: {
161   #       'action' => 'privmsg'
162   #       'channel' => '#rbot',
163   #       'foo.bar' => 'baz'
164   #     }
165   #   })
166   #
167   class Query
168     # array of ids to match (OR)
169     attr_reader :id
170     # array of topics to match with wildcard support (OR)
171     attr_reader :topic
172     # hash with from: timestamp and to: timestamp
173     attr_reader :timestamp
174     # hash of key values to match
175     attr_reader :payload
176
177     def initialize(query)
178       @id = query[:id] || []
179       @id = [@id] if @id.is_a? String
180       @topic = query[:topic] || []
181       @topic = [@topic] if @topic.is_a? String
182       @timestamp = {
183         from: nil, to: nil
184       }
185       if query[:timestamp] and query[:timestamp][:from]
186         @timestamp[:from] = query[:timestamp][:from]
187       end
188       if query[:timestamp] and query[:timestamp][:to]
189         @timestamp[:to] = query[:timestamp][:to]
190       end
191       @payload = query[:payload] || {}
192     end
193
194     # returns true if the given message matches the query
195     def matches?(message)
196       return false if not @id.empty? and not @id.include? message.id
197       return false if not @topic.empty? and not topic_matches? message.topic
198       if @timestamp[:from]
199         return false unless message.timestamp >= @timestamp[:from]
200       end
201       if @timestamp[:to]
202         return false unless message.timestamp <= @timestamp[:to]
203       end
204       found = false
205       @payload.each_pair do |key, value|
206         begin
207           message.get(key.to_s)
208         rescue ArgumentError
209         end
210         found = true
211       end
212       return false if not found and not @payload.empty?
213       true
214     end
215
216     def topic_matches?(_topic)
217       @topic.each do |topic|
218         if topic.include? '*'
219           match = true
220           topic.split('.').zip(_topic.split('.')).each do |a, b|
221             if a == '*'
222               if not b or b.empty?
223                 match = false
224               end
225             else
226               match = false unless a == b
227             end
228           end
229           return true if match
230         else
231           return true if topic == _topic
232         end
233       end
234       return false
235     end
236
237     # factory that constructs a query
238     class Factory
239       attr_reader :query
240       def initialize
241         @query = {
242           id: [],
243           topic: [],
244           timestamp: {
245             from: nil, to: nil
246           },
247           payload: {}
248         }
249       end
250
251       def id(*_id)
252         @query[:id] += _id
253       end
254
255       def topic(*_topic)
256           @query[:topic] += _topic
257       end
258
259       def timestamp(range)
260         @query[:timestamp] = range
261       end
262
263       def payload(query)
264         @query[:payload].merge!(query)
265       end
266     end
267
268     def self.define(query=nil, &block)
269       factory = Factory.new
270       if block_given?
271         factory.instance_eval(&block)
272         query = factory.query
273       end
274       Query.new query
275     end
276
277   end
278
279
280   class JournalBroker
281     attr_reader :storage
282     class Subscription
283       attr_reader :topic
284       attr_reader :block
285       def initialize(broker, topic, block)
286         @broker = broker
287         @topic = topic
288         @block = block
289       end
290       def cancel
291         @broker.unsubscribe(self)
292       end
293     end
294
295     def initialize(opts={})
296       # overrides the internal consumer with a block
297       @consumer = opts[:consumer]
298       # storage backend
299       @storage = opts[:storage]
300       unless @storage
301         warning 'journal broker: no storage set up, won\'t persist messages'
302       end
303       @queue = Queue.new
304       # consumer thread:
305       @thread = Thread.new do
306         while message = @queue.pop
307           begin
308             consume message
309           # pop(true) ... rescue ThreadError => e
310           rescue Exception => e
311             error 'journal broker: exception in consumer thread'
312             error $!
313           end
314         end
315       end
316       @subscriptions = []
317       # lookup-table for subscriptions by their topic
318       @topic_subs = {}
319     end
320
321     def consume(message)
322       return unless message
323       @consumer.call(message) if @consumer
324
325       # notify subscribers
326       if @topic_subs.has_key? message.topic
327         @topic_subs[message.topic].each do |s|
328           s.block.call(message)
329         end
330       end
331
332       @storage.insert(message) if @storage
333     end
334
335     def persists?
336       true if @storage
337     end
338
339     def shutdown
340       log 'journal shutdown'
341       @subscriptions.clear
342       @topic_subs.clear
343       @queue << nil
344       @thread.join
345       @thread = nil
346     end
347
348     def publish(topic, payload)
349       debug 'journal publish message in %s: %s' % [topic, payload.inspect]
350       @queue << JournalMessage::create(topic, payload)
351       nil
352     end
353
354     # Subscribe to receive messages from a topic.
355     #
356     # You can use this method to subscribe to messages that
357     # are published within a specified topic. You must provide
358     # a receiving block to receive messages one-by-one.
359     # The method returns an instance of Subscription that can
360     # be used to cancel the subscription by invoking cancel
361     # on it.
362     #
363     #   journal.subscribe('irclog') do |message|
364     #     # received irclog messages...
365     #   end
366     #
367     def subscribe(topic=nil, &block)
368       raise ArgumentError.new unless block_given?
369       s = Subscription.new(self, topic, block)
370       @subscriptions << s
371       unless @topic_subs.has_key? topic
372         @topic_subs[topic] = []
373       end
374       @topic_subs[topic] << s
375       s
376     end
377
378     def unsubscribe(s)
379       if @topic_subs.has_key? s.topic
380         @topic_subs[s.topic].delete(s)
381       end
382       @subscriptions.delete s
383     end
384
385     # Find and return persisted messages by a query.
386     #
387     # This method will either return all messages or call the provided
388     # block for each message. It will filter the messages by the
389     # provided Query instance. Limit and offset might be used to
390     # constrain the result.
391     # The query might also be a hash or proc that is passed to
392     # Query.define first.
393     #
394     # @param query [Query] 
395     # @param limit [Integer] how many items to return
396     # @param offset [Integer] relative offset in results
397     def find(query, limit=100, offset=0, &block)
398       unless query.is_a? Query
399         query = Query.define(query)
400       end
401       if block_given?
402         @storage.find(query, limit, offset, &block)
403       else
404         @storage.find(query, limit, offset)
405       end
406     end
407
408     def count(query=nil)
409       unless query.is_a? Query
410         query = Query.define(query)
411       end
412       @storage.count(query)
413     end
414
415     def remove(query=nil)
416       unless query.is_a? Query
417         query = Query.define(query)
418       end
419       @storage.remove(query)
420     end
421
422   end
423
424 end # Journal
425 end # Bot
426 end # Irc
427