5 # :title: rbot's persistent message queue
16 The journal is a persistent message queue for rbot, its based on a basic
17 publish/subscribe model and persists messages into backend databases
18 that can be efficiently searched for past messages.
20 It is a addition to the key value storage already present in rbot
21 through its registry subsystem.
25 Config.register Config::StringValue.new('journal.storage',
27 :requires_restart => true,
28 :desc => 'storage engine used by the journal')
29 Config.register Config::StringValue.new('journal.storage.uri',
31 :requires_restart => true,
32 :desc => 'storage database uri')
34 class InvalidJournalMessage < StandardError
36 class ConsumeInterrupt < StandardError
38 class StorageError < StandardError
42 # a unique identification of this message
45 # describes a hierarchical queue into which this message belongs
48 # when this message was published as a Time instance
49 attr_reader :timestamp
51 # contains the actual message as a Hash
54 def initialize(message)
56 @timestamp = message[:timestamp]
57 @topic = message[:topic]
58 @payload = message[:payload]
59 if @payload.class != Hash
60 raise InvalidJournalMessage.new('payload must be a hash!')
64 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
65 value = pkey.split('.').reduce(@payload) do |hash, key|
66 if hash.has_key?(key) or hash.has_key?(key.to_sym)
67 hash[key] || hash[key.to_sym]
69 if default == :exception
70 raise ArgumentError.new
79 (@id == other.id) rescue false
82 def self.create(topic, payload, opt={})
84 id: opt[:id] || SecureRandom.uuid,
85 timestamp: opt[:timestamp] || Time.now,
94 # intializes/opens a new storage connection
95 def initialize(opts={})
98 # inserts a message in storage
102 # creates/ensures a index exists on the payload specified by key
103 def ensure_index(key)
106 # returns a array of message instances that match the query
107 def find(query=nil, limit=100, offset=0)
110 # returns the number of messages that match the query
114 # remove messages that match the query
115 def remove(query=nil)
118 # destroy the underlying table/collection
122 # Returns all classes from the namespace that implement this interface
124 ObjectSpace.each_object(Class).select { |klass| klass < self }
128 def create(name, uri)
129 log 'load journal storage adapter: ' + name
130 load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
131 cls = AbstractStorage.get_impl.first
136 # Describes a query on journal entries, it is used both to describe
137 # a subscription aswell as to query persisted messages.
138 # There two ways to declare a Query instance, using
146 # timestamp from: Time.now, to: Time.now + 60 * 10
147 # payload 'action': :privmsg
148 # payload 'channel': '#rbot'
149 # payload 'foo.bar': 'baz'
152 # or using a hash: (NOTE: avoid using symbols in payload)
155 # id: ['foo', 'bar'],
156 # topic: ['log.irc.*', 'log.core'],
159 # to: Time.now + 60 * 10
162 # 'action' => 'privmsg'
163 # 'channel' => '#rbot',
169 # array of ids to match (OR)
171 # array of topics to match with wildcard support (OR)
173 # hash with from: timestamp and to: timestamp
174 attr_reader :timestamp
175 # hash of key values to match
178 def initialize(query)
180 @topic = query[:topic]
181 @timestamp = query[:timestamp]
182 @payload = query[:payload]
185 # returns true if the given message matches the query
186 def matches?(message)
187 return false if not @id.empty? and not @id.include? message.id
188 return false if not @topic.empty? and not topic_matches? message.topic
190 return false unless message.timestamp >= @timestamp[:from]
193 return false unless message.timestamp <= @timestamp[:to]
196 @payload.each_pair do |key, value|
198 message.get(key.to_s)
203 return false if not found and not @payload.empty?
207 def topic_matches?(_topic)
208 @topic.each do |topic|
209 if topic.include? '*'
211 topic.split('.').zip(_topic.split('.')).each do |a, b|
217 match = false unless a == b
222 return true if topic == _topic
228 # factory that constructs a query
247 @query[:topic] += _topic
251 @query[:timestamp] = range
255 @query[:payload].merge!(query)
259 def self.define(query=nil, &block)
260 factory = Factory.new
262 factory.instance_eval(&block)
263 query = factory.query
275 def initialize(broker, query, block)
281 @broker.unsubscribe(self)
285 def initialize(opts={})
286 # overrides the internal consumer with a block
287 @consumer = opts[:consumer]
290 @storage = opts[:storage] || Storage.create(
291 @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
293 @storage = opts[:storage]
297 @thread = Thread.new do
301 # pop(true) ... rescue ThreadError => e
302 rescue ConsumeInterrupt => e
303 error 'journal broker: stop thread, consume interrupt raised'
305 rescue Exception => e
306 error 'journal broker: exception in consumer thread'
311 # TODO: this is a first naive implementation, later we do the
312 # message/query matching for incoming messages more efficiently
317 return unless message
318 @consumer.call(message) if @consumer
321 @subscriptions.each do |s|
322 if s.query.matches? message
323 s.block.call(message)
327 @storage.insert(message) if @storage
339 @thread.raise ConsumeInterrupt.new
342 def publish(topic, payload)
343 @queue.push JournalMessage::create(topic, payload)
346 # subscribe to messages that match the given query
347 def subscribe(query, &block)
348 raise ArgumentError.new unless block_given?
349 s = Subscription.new(self, query, block)
354 def unsubscribe(subscription)
355 @subscriptions.delete subscription
358 def find(query=nil, limit=100, offset=0, &block)
361 res = @storage.find(query, limit, offset)
363 end until res.length > 0
365 @storage.find(query, limit, offset)
370 @storage.count(query)
373 def remove(query=nil)
374 @storage.remove(query)