#++
#
# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
require 'thread'
require 'securerandom'
=end
- Config.register Config::StringValue.new('journal.storage',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage engine used by the journal')
- Config.register Config::StringValue.new('journal.storage.uri',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage database uri')
-
class InvalidJournalMessage < StandardError
end
- class ConsumeInterrupt < StandardError
- end
class StorageError < StandardError
end
end
end
+ # Access payload value by key.
def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
value = pkey.split('.').reduce(@payload) do |hash, key|
if hash.has_key?(key) or hash.has_key?(key.to_sym)
end
end
+ # Access payload value by key alias for get(key, nil).
+ def [](key)
+ get(key, nil)
+ end
+
def ==(other)
(@id == other.id) rescue false
end
def self.create(topic, payload, opt={})
+ # cleanup payload to only contain strings
JournalMessage.new(
id: opt[:id] || SecureRandom.uuid,
timestamp: opt[:timestamp] || Time.now,
end
# creates/ensures a index exists on the payload specified by key
- def ensure_index(key)
+ def ensure_payload_index(key)
end
# returns a array of message instances that match the query
- def find(query=nil, limit=100, offset=0)
+ def find(query=nil, limit=100, offset=0, &block)
end
# returns the number of messages that match the query
end
end
- def create(name, uri)
+ def self.create(name, uri)
log 'load journal storage adapter: ' + name
load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
cls = AbstractStorage.get_impl.first
attr_reader :payload
def initialize(query)
- @id = query[:id]
- @topic = query[:topic]
- @timestamp = query[:timestamp]
- @payload = query[:payload]
+ @id = query[:id] || []
+ @id = [@id] if @id.is_a? String
+ @topic = query[:topic] || []
+ @topic = [@topic] if @topic.is_a? String
+ @timestamp = {
+ from: nil, to: nil
+ }
+ if query[:timestamp] and query[:timestamp][:from]
+ @timestamp[:from] = query[:timestamp][:from]
+ end
+ if query[:timestamp] and query[:timestamp][:to]
+ @timestamp[:to] = query[:timestamp][:to]
+ end
+ @payload = query[:payload] || {}
end
# returns true if the given message matches the query
class JournalBroker
+ attr_reader :storage
class Subscription
- attr_reader :query
+ attr_reader :topic
attr_reader :block
- def initialize(broker, query, block)
+ def initialize(broker, topic, block)
@broker = broker
- @query = query
+ @topic = topic
@block = block
end
def cancel
# overrides the internal consumer with a block
@consumer = opts[:consumer]
# storage backend
- if @bot
- @storage = opts[:storage] || Storage.create(
- @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
- else
- @storage = opts[:storage]
+ @storage = opts[:storage]
+ unless @storage
+ warning 'journal broker: no storage set up, won\'t persist messages'
end
@queue = Queue.new
# consumer thread:
@thread = Thread.new do
- loop do
+ while message = @queue.pop
begin
- consume @queue.pop
+ consume message
# pop(true) ... rescue ThreadError => e
- rescue ConsumeInterrupt => e
- error 'journal broker: stop thread, consume interrupt raised'
- break
rescue Exception => e
error 'journal broker: exception in consumer thread'
error $!
end
end
end
- # TODO: this is a first naive implementation, later we do the
- # message/query matching for incoming messages more efficiently
@subscriptions = []
+ # lookup-table for subscriptions by their topic
+ @topic_subs = {}
end
def consume(message)
@consumer.call(message) if @consumer
# notify subscribers
- @subscriptions.each do |s|
- if s.query.matches? message
+ if @topic_subs.has_key? message.topic
+ @topic_subs[message.topic].each do |s|
s.block.call(message)
end
end
true if @storage
end
- def join
- @thread.join
- end
-
def shutdown
- @thread.raise ConsumeInterrupt.new
+ log 'journal shutdown'
+ @subscriptions.clear
+ @topic_subs.clear
+ @queue << nil
+ @thread.join
+ @thread = nil
end
def publish(topic, payload)
- @queue.push JournalMessage::create(topic, payload)
+ debug 'journal publish message in %s: %s' % [topic, payload.inspect]
+ @queue << JournalMessage::create(topic, payload)
+ nil
end
- # subscribe to messages that match the given query
- def subscribe(query, &block)
+ # Subscribe to receive messages from a topic.
+ #
+ # You can use this method to subscribe to messages that
+ # are published within a specified topic. You must provide
+ # a receiving block to receive messages one-by-one.
+ # The method returns an instance of Subscription that can
+ # be used to cancel the subscription by invoking cancel
+ # on it.
+ #
+ # journal.subscribe('irclog') do |message|
+ # # received irclog messages...
+ # end
+ #
+ def subscribe(topic=nil, &block)
raise ArgumentError.new unless block_given?
- s = Subscription.new(self, query, block)
+ s = Subscription.new(self, topic, block)
@subscriptions << s
+ unless @topic_subs.has_key? topic
+ @topic_subs[topic] = []
+ end
+ @topic_subs[topic] << s
s
end
- def unsubscribe(subscription)
- @subscriptions.delete subscription
+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
end
- def find(query=nil, limit=100, offset=0, &block)
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
if block_given?
- begin
- res = @storage.find(query, limit, offset)
- block.call(res)
- end until res.length > 0
+ @storage.find(query, limit, offset, &block)
else
@storage.find(query, limit, offset)
end
end
def count(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
@storage.count(query)
end
def remove(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
@storage.remove(query)
end
+ def ensure_payload_index(key)
+ @storage.ensure_payload_index(key)
+ end
+
end
end # Journal