5 # :title: rbot's persistent message queue
16 The journal is a persistent message queue for rbot, its based on a basic
17 publish/subscribe model and persists messages into backend databases
18 that can be efficiently searched for past messages.
20 It is a addition to the key value storage already present in rbot
21 through its registry subsystem.
25 class InvalidJournalMessage < StandardError
27 class ConsumeInterrupt < StandardError
31 # a unique identification of this message
34 # describes a hierarchical queue into which this message belongs
37 # when this message was published as a Time instance
38 attr_reader :timestamp
40 # contains the actual message as a Hash
43 def initialize(message)
45 @timestamp = message[:timestamp]
46 @topic = message[:topic]
47 @payload = message[:payload]
48 if @payload.class != Hash
49 raise InvalidJournalMessage.new('payload must be a hash!')
53 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
54 value = pkey.split('.').reduce(@payload) do |hash, key|
55 if hash.has_key?(key) or hash.has_key?(key.to_sym)
56 hash[key] || hash[key.to_sym]
58 if default == :exception
59 raise ArgumentError.new
67 def self.create(topic, payload)
69 id: SecureRandom.uuid,
77 # Describes a query on journal entries, it is used both to describe
78 # a subscription aswell as to query persisted messages.
79 # There two ways to declare a Query instance, using
87 # timestamp from: Time.now, to: Time.now + 60 * 10
88 # payload 'action': :privmsg
89 # payload 'channel': '#rbot'
90 # payload 'foo.bar': 'baz'
93 # or using a hash: (NOTE: avoid using symbols in payload)
97 # topic: ['log.irc.*', 'log.core'],
100 # to: Time.now + 60 * 10
103 # 'action' => 'privmsg'
104 # 'channel' => '#rbot',
110 # array of ids to match (OR)
112 # array of topics to match with wildcard support (OR)
114 # hash with from: timestamp and to: timestamp
115 attr_reader :timestamp
116 # hash of key values to match
119 def initialize(query)
121 @topic = query[:topic]
122 @timestamp = query[:timestamp]
123 @payload = query[:payload]
126 # returns true if the given message matches the query
127 def matches?(message)
128 return false if not @id.empty? and not @id.include? message.id
129 return false if not @topic.empty? and not topic_matches? message.topic
131 return false unless message.timestamp >= @timestamp[:from]
134 return false unless message.timestamp <= @timestamp[:to]
137 @payload.each_pair do |key, value|
139 message.get(key.to_s)
144 return false if not found and not @payload.empty?
148 def topic_matches?(_topic)
149 @topic.each do |topic|
150 if topic.include? '*'
152 topic.split('.').zip(_topic.split('.')).each do |a, b|
158 match = false unless a == b
163 return true if topic == _topic
169 # factory that constructs a query
188 @query[:topic] += _topic
192 @query[:timestamp] = range
196 @query[:payload].merge!(query)
200 def self.define(query=nil, &block)
201 factory = Factory.new
203 factory.instance_eval(&block)
204 query = factory.query
214 def initialize(opts={})
215 # overrides the internal consumer with a block
216 @consumer = opts[:consumer]
219 @thread = Thread.new do
223 # pop(true) ... rescue ThreadError => e
224 rescue ConsumeInterrupt => e
225 error 'journal broker: stop thread, consume interrupt raised'
227 rescue Exception => e
228 error 'journal broker: exception in consumer thread'
233 # TODO: this is a first naive implementation, later we do the
234 # message/query matching for incoming messages more efficiently
239 return unless message
240 @consumer.call(message) if @consumer
243 @subscriptions.each do |query, block|
244 if query.matches? message
255 @thread.raise ConsumeInterrupt.new
259 def publish(topic, payload)
260 @queue.push JournalMessage::create(topic, payload)
263 # subscribe to messages that match the given query
264 def subscribe(query, &block)
265 raise ArgumentError.new unless block_given?
266 @subscriptions << [query, block]