5 # :title: rbot's persistent message queue
16 The journal is a persistent message queue for rbot, its based on a basic
17 publish/subscribe model and persists messages into backend databases
18 that can be efficiently searched for past messages.
20 It is a addition to the key value storage already present in rbot
21 through its registry subsystem.
25 Config.register Config::StringValue.new('journal.storage',
27 :requires_restart => true,
28 :desc => 'storage engine used by the journal')
29 Config.register Config::StringValue.new('journal.storage.uri',
31 :requires_restart => true,
32 :desc => 'storage database uri')
34 class InvalidJournalMessage < StandardError
36 class ConsumeInterrupt < StandardError
38 class StorageError < StandardError
42 # a unique identification of this message
45 # describes a hierarchical queue into which this message belongs
48 # when this message was published as a Time instance
49 attr_reader :timestamp
51 # contains the actual message as a Hash
54 def initialize(message)
56 @timestamp = message[:timestamp]
57 @topic = message[:topic]
58 @payload = message[:payload]
59 if @payload.class != Hash
60 raise InvalidJournalMessage.new('payload must be a hash!')
64 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
65 value = pkey.split('.').reduce(@payload) do |hash, key|
66 if hash.has_key?(key) or hash.has_key?(key.to_sym)
67 hash[key] || hash[key.to_sym]
69 if default == :exception
70 raise ArgumentError.new
82 def self.create(topic, payload, opt={})
84 id: opt[:id] || SecureRandom.uuid,
85 timestamp: opt[:timestamp] || Time.now,
94 # intializes/opens a new storage connection
95 def initialize(opts={})
98 # inserts a message in storage
102 # creates/ensures a index exists on the payload specified by key
103 def ensure_index(key)
106 # returns a array of message instances that match the query
107 def find(query, limit=10, offset=0)
110 # returns the number of messages that match the query
114 # delete messages that match the query
120 # Describes a query on journal entries, it is used both to describe
121 # a subscription aswell as to query persisted messages.
122 # There two ways to declare a Query instance, using
130 # timestamp from: Time.now, to: Time.now + 60 * 10
131 # payload 'action': :privmsg
132 # payload 'channel': '#rbot'
133 # payload 'foo.bar': 'baz'
136 # or using a hash: (NOTE: avoid using symbols in payload)
139 # id: ['foo', 'bar'],
140 # topic: ['log.irc.*', 'log.core'],
143 # to: Time.now + 60 * 10
146 # 'action' => 'privmsg'
147 # 'channel' => '#rbot',
153 # array of ids to match (OR)
155 # array of topics to match with wildcard support (OR)
157 # hash with from: timestamp and to: timestamp
158 attr_reader :timestamp
159 # hash of key values to match
162 def initialize(query)
164 @topic = query[:topic]
165 @timestamp = query[:timestamp]
166 @payload = query[:payload]
169 # returns true if the given message matches the query
170 def matches?(message)
171 return false if not @id.empty? and not @id.include? message.id
172 return false if not @topic.empty? and not topic_matches? message.topic
174 return false unless message.timestamp >= @timestamp[:from]
177 return false unless message.timestamp <= @timestamp[:to]
180 @payload.each_pair do |key, value|
182 message.get(key.to_s)
187 return false if not found and not @payload.empty?
191 def topic_matches?(_topic)
192 @topic.each do |topic|
193 if topic.include? '*'
195 topic.split('.').zip(_topic.split('.')).each do |a, b|
201 match = false unless a == b
206 return true if topic == _topic
212 # factory that constructs a query
231 @query[:topic] += _topic
235 @query[:timestamp] = range
239 @query[:payload].merge!(query)
243 def self.define(query=nil, &block)
244 factory = Factory.new
246 factory.instance_eval(&block)
247 query = factory.query
259 def initialize(broker, query, block)
265 @broker.unsubscribe(self)
269 def initialize(opts={})
270 # overrides the internal consumer with a block
271 @consumer = opts[:consumer]
273 @storage = opts[:storage]
276 @thread = Thread.new do
280 # pop(true) ... rescue ThreadError => e
281 rescue ConsumeInterrupt => e
282 error 'journal broker: stop thread, consume interrupt raised'
284 rescue Exception => e
285 error 'journal broker: exception in consumer thread'
290 # TODO: this is a first naive implementation, later we do the
291 # message/query matching for incoming messages more efficiently
296 return unless message
297 @consumer.call(message) if @consumer
300 @subscriptions.each do |s|
301 if s.query.matches? message
302 s.block.call(message)
306 @storage.insert(message) if @storage
318 @thread.raise ConsumeInterrupt.new
321 def publish(topic, payload)
322 @queue.push JournalMessage::create(topic, payload)
325 # subscribe to messages that match the given query
326 def subscribe(query, &block)
327 raise ArgumentError.new unless block_given?
328 s = Subscription.new(self, query, block)
333 def unsubscribe(subscription)
334 @subscriptions.delete subscription