5 # :title: rbot's persistent message queue
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
10 require 'securerandom'
18 The journal is a persistent message queue for rbot, its based on a basic
19 publish/subscribe model and persists messages into backend databases
20 that can be efficiently searched for past messages.
22 It is a addition to the key value storage already present in rbot
23 through its registry subsystem.
27 class InvalidJournalMessage < StandardError
29 class StorageError < StandardError
33 # a unique identification of this message
36 # describes a hierarchical queue into which this message belongs
39 # when this message was published as a Time instance
40 attr_reader :timestamp
42 # contains the actual message as a Hash
45 def initialize(message)
47 @timestamp = message[:timestamp]
48 @topic = message[:topic]
49 @payload = message[:payload]
50 if @payload.class != Hash
51 raise InvalidJournalMessage.new('payload must be a hash!')
55 # Access payload value by key.
56 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57 value = pkey.split('.').reduce(@payload) do |hash, key|
58 if hash.has_key?(key) or hash.has_key?(key.to_sym)
59 hash[key] || hash[key.to_sym]
61 if default == :exception
62 raise ArgumentError.new
70 # Access payload value by key alias for get(key, nil).
76 (@id == other.id) rescue false
79 def self.create(topic, payload, opt={})
80 # cleanup payload to only contain strings
81 payload = payload.map { |k, v| [k.to_s, v.to_s] }.to_h
83 id: opt[:id] || SecureRandom.uuid,
84 timestamp: opt[:timestamp] || Time.now,
93 # intializes/opens a new storage connection
94 def initialize(opts={})
97 # inserts a message in storage
101 # creates/ensures a index exists on the payload specified by key
102 def ensure_index(key)
105 # returns a array of message instances that match the query
106 def find(query=nil, limit=100, offset=0, &block)
109 # returns the number of messages that match the query
113 # remove messages that match the query
114 def remove(query=nil)
117 # destroy the underlying table/collection
121 # Returns all classes from the namespace that implement this interface
123 ObjectSpace.each_object(Class).select { |klass| klass < self }
127 def self.create(name, uri)
128 log 'load journal storage adapter: ' + name
129 load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
130 cls = AbstractStorage.get_impl.first
135 # Describes a query on journal entries, it is used both to describe
136 # a subscription aswell as to query persisted messages.
137 # There two ways to declare a Query instance, using
145 # timestamp from: Time.now, to: Time.now + 60 * 10
146 # payload 'action': :privmsg
147 # payload 'channel': '#rbot'
148 # payload 'foo.bar': 'baz'
151 # or using a hash: (NOTE: avoid using symbols in payload)
154 # id: ['foo', 'bar'],
155 # topic: ['log.irc.*', 'log.core'],
158 # to: Time.now + 60 * 10
161 # 'action' => 'privmsg'
162 # 'channel' => '#rbot',
168 # array of ids to match (OR)
170 # array of topics to match with wildcard support (OR)
172 # hash with from: timestamp and to: timestamp
173 attr_reader :timestamp
174 # hash of key values to match
177 def initialize(query)
178 @id = query[:id] || []
179 @id = [@id] if @id.is_a? String
180 @topic = query[:topic] || []
181 @topic = [@topic] if @topic.is_a? String
185 if query[:timestamp] and query[:timestamp][:from]
186 @timestamp[:from] = query[:timestamp][:from]
188 if query[:timestamp] and query[:timestamp][:to]
189 @timestamp[:to] = query[:timestamp][:to]
191 @payload = query[:payload] || {}
194 # returns true if the given message matches the query
195 def matches?(message)
196 return false if not @id.empty? and not @id.include? message.id
197 return false if not @topic.empty? and not topic_matches? message.topic
199 return false unless message.timestamp >= @timestamp[:from]
202 return false unless message.timestamp <= @timestamp[:to]
205 @payload.each_pair do |key, value|
207 message.get(key.to_s)
212 return false if not found and not @payload.empty?
216 def topic_matches?(_topic)
217 @topic.each do |topic|
218 if topic.include? '*'
220 topic.split('.').zip(_topic.split('.')).each do |a, b|
226 match = false unless a == b
231 return true if topic == _topic
237 # factory that constructs a query
256 @query[:topic] += _topic
260 @query[:timestamp] = range
264 @query[:payload].merge!(query)
268 def self.define(query=nil, &block)
269 factory = Factory.new
271 factory.instance_eval(&block)
272 query = factory.query
284 def initialize(broker, topic, block)
290 @broker.unsubscribe(self)
294 def initialize(opts={})
295 # overrides the internal consumer with a block
296 @consumer = opts[:consumer]
298 @storage = opts[:storage]
300 warning 'journal broker: no storage set up, won\'t persist messages'
304 @thread = Thread.new do
305 while message = @queue.pop
308 # pop(true) ... rescue ThreadError => e
309 rescue Exception => e
310 error 'journal broker: exception in consumer thread'
316 # lookup-table for subscriptions by their topic
321 return unless message
322 @consumer.call(message) if @consumer
325 if @topic_subs.has_key? message.topic
326 @topic_subs[message.topic].each do |s|
327 s.block.call(message)
331 @storage.insert(message) if @storage
339 log 'journal shutdown'
347 def publish(topic, payload)
348 debug 'journal publish message in %s: %s' % [topic, payload.inspect]
349 @queue << JournalMessage::create(topic, payload)
353 # Subscribe to receive messages from a topic.
355 # You can use this method to subscribe to messages that
356 # are published within a specified topic. You must provide
357 # a receiving block to receive messages one-by-one.
358 # The method returns an instance of Subscription that can
359 # be used to cancel the subscription by invoking cancel
362 # journal.subscribe('irclog') do |message|
363 # # received irclog messages...
366 def subscribe(topic=nil, &block)
367 raise ArgumentError.new unless block_given?
368 s = Subscription.new(self, topic, block)
370 unless @topic_subs.has_key? topic
371 @topic_subs[topic] = []
373 @topic_subs[topic] << s
378 if @topic_subs.has_key? s.topic
379 @topic_subs[s.topic].delete(s)
381 @subscriptions.delete s
384 # Find and return persisted messages by a query.
386 # This method will either return all messages or call the provided
387 # block for each message. It will filter the messages by the
388 # provided Query instance. Limit and offset might be used to
389 # constrain the result.
390 # The query might also be a hash or proc that is passed to
391 # Query.define first.
393 # @param query [Query]
394 # @param limit [Integer] how many items to return
395 # @param offset [Integer] relative offset in results
396 def find(query, limit=100, offset=0, &block)
397 unless query.is_a? Query
398 query = Query.define(query)
401 @storage.find(query, limit, offset, &block)
403 @storage.find(query, limit, offset)
408 unless query.is_a? Query
409 query = Query.define(query)
411 @storage.count(query)
414 def remove(query=nil)
415 unless query.is_a? Query
416 query = Query.define(query)
418 @storage.remove(query)