5 # :title: rbot's persistent message queue
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
10 require 'securerandom'
18 The journal is a persistent message queue for rbot, its based on a basic
19 publish/subscribe model and persists messages into backend databases
20 that can be efficiently searched for past messages.
22 It is a addition to the key value storage already present in rbot
23 through its registry subsystem.
27 class InvalidJournalMessage < StandardError
29 class StorageError < StandardError
33 # a unique identification of this message
36 # describes a hierarchical queue into which this message belongs
39 # when this message was published as a Time instance
40 attr_reader :timestamp
42 # contains the actual message as a Hash
45 def initialize(message)
47 @timestamp = message[:timestamp]
48 @topic = message[:topic]
49 @payload = message[:payload]
50 if @payload.class != Hash
51 raise InvalidJournalMessage.new('payload must be a hash!')
55 # Access payload value by key.
56 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57 value = pkey.split('.').reduce(@payload) do |hash, key|
58 if hash.has_key?(key) or hash.has_key?(key.to_sym)
59 hash[key] || hash[key.to_sym]
61 if default == :exception
62 raise ArgumentError.new
70 # Access payload value by key alias for get(key, nil).
76 (@id == other.id) rescue false
79 def self.create(topic, payload, opt={})
80 # cleanup payload to only contain strings
82 id: opt[:id] || SecureRandom.uuid,
83 timestamp: opt[:timestamp] || Time.now,
92 # intializes/opens a new storage connection
93 def initialize(opts={})
96 # inserts a message in storage
100 # creates/ensures a index exists on the payload specified by key
101 def ensure_payload_index(key)
104 # returns a array of message instances that match the query
105 def find(query=nil, limit=100, offset=0, &block)
108 # returns the number of messages that match the query
112 # remove messages that match the query
113 def remove(query=nil)
116 # destroy the underlying table/collection
120 # Returns all classes from the namespace that implement this interface
122 ObjectSpace.each_object(Class).select { |klass| klass < self }
126 def self.create(name, uri)
127 log 'load journal storage adapter: ' + name
128 load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
129 cls = AbstractStorage.get_impl.first
134 # Describes a query on journal entries, it is used both to describe
135 # a subscription aswell as to query persisted messages.
136 # There two ways to declare a Query instance, using
144 # timestamp from: Time.now, to: Time.now + 60 * 10
145 # payload 'action': :privmsg
146 # payload 'channel': '#rbot'
147 # payload 'foo.bar': 'baz'
150 # or using a hash: (NOTE: avoid using symbols in payload)
153 # id: ['foo', 'bar'],
154 # topic: ['log.irc.*', 'log.core'],
157 # to: Time.now + 60 * 10
160 # 'action' => 'privmsg'
161 # 'channel' => '#rbot',
167 # array of ids to match (OR)
169 # array of topics to match with wildcard support (OR)
171 # hash with from: timestamp and to: timestamp
172 attr_reader :timestamp
173 # hash of key values to match
176 def initialize(query)
177 @id = query[:id] || []
178 @id = [@id] if @id.is_a? String
179 @topic = query[:topic] || []
180 @topic = [@topic] if @topic.is_a? String
184 if query[:timestamp] and query[:timestamp][:from]
185 @timestamp[:from] = query[:timestamp][:from]
187 if query[:timestamp] and query[:timestamp][:to]
188 @timestamp[:to] = query[:timestamp][:to]
190 @payload = query[:payload] || {}
193 # returns true if the given message matches the query
194 def matches?(message)
195 return false if not @id.empty? and not @id.include? message.id
196 return false if not @topic.empty? and not topic_matches? message.topic
198 return false unless message.timestamp >= @timestamp[:from]
201 return false unless message.timestamp <= @timestamp[:to]
204 @payload.each_pair do |key, value|
206 message.get(key.to_s)
211 return false if not found and not @payload.empty?
215 def topic_matches?(_topic)
216 @topic.each do |topic|
217 if topic.include? '*'
219 topic.split('.').zip(_topic.split('.')).each do |a, b|
225 match = false unless a == b
230 return true if topic == _topic
236 # factory that constructs a query
255 @query[:topic] += _topic
259 @query[:timestamp] = range
263 @query[:payload].merge!(query)
267 def self.define(query=nil, &block)
268 factory = Factory.new
270 factory.instance_eval(&block)
271 query = factory.query
284 def initialize(broker, topic, block)
290 @broker.unsubscribe(self)
294 def initialize(opts={})
295 # overrides the internal consumer with a block
296 @consumer = opts[:consumer]
298 @storage = opts[:storage]
300 warning 'journal broker: no storage set up, won\'t persist messages'
304 @thread = Thread.new do
305 while message = @queue.pop
308 # pop(true) ... rescue ThreadError => e
309 rescue Exception => e
310 error 'journal broker: exception in consumer thread'
316 # lookup-table for subscriptions by their topic
321 return unless message
322 @consumer.call(message) if @consumer
325 if @topic_subs.has_key? message.topic
326 @topic_subs[message.topic].each do |s|
327 s.block.call(message)
331 @storage.insert(message) if @storage
339 log 'journal shutdown'
347 def publish(topic, payload)
348 debug 'journal publish message in %s: %s' % [topic, payload.inspect]
349 @queue << JournalMessage::create(topic, payload)
353 # Subscribe to receive messages from a topic.
355 # You can use this method to subscribe to messages that
356 # are published within a specified topic. You must provide
357 # a receiving block to receive messages one-by-one.
358 # The method returns an instance of Subscription that can
359 # be used to cancel the subscription by invoking cancel
362 # journal.subscribe('irclog') do |message|
363 # # received irclog messages...
366 def subscribe(topic=nil, &block)
367 raise ArgumentError.new unless block_given?
368 s = Subscription.new(self, topic, block)
370 unless @topic_subs.has_key? topic
371 @topic_subs[topic] = []
373 @topic_subs[topic] << s
378 if @topic_subs.has_key? s.topic
379 @topic_subs[s.topic].delete(s)
381 @subscriptions.delete s
384 # Find and return persisted messages by a query.
386 # This method will either return all messages or call the provided
387 # block for each message. It will filter the messages by the
388 # provided Query instance. Limit and offset might be used to
389 # constrain the result.
390 # The query might also be a hash or proc that is passed to
391 # Query.define first.
393 # @param query [Query]
394 # @param limit [Integer] how many items to return
395 # @param offset [Integer] relative offset in results
396 def find(query, limit=100, offset=0, &block)
397 unless query.is_a? Query
398 query = Query.define(query)
401 @storage.find(query, limit, offset, &block)
403 @storage.find(query, limit, offset)
408 unless query.is_a? Query
409 query = Query.define(query)
411 @storage.count(query)
414 def remove(query=nil)
415 unless query.is_a? Query
416 query = Query.define(query)
418 @storage.remove(query)
421 def ensure_payload_index(key)
422 @storage.ensure_payload_index(key)