diff options
Diffstat (limited to 'lib/rbot/journal.rb')
-rw-r--r-- | lib/rbot/journal.rb | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb new file mode 100644 index 00000000..981ff6e4 --- /dev/null +++ b/lib/rbot/journal.rb @@ -0,0 +1,430 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +require 'thread' +require 'securerandom' + +module Irc +class Bot +module Journal + +=begin rdoc + + The journal is a persistent message queue for rbot, its based on a basic + publish/subscribe model and persists messages into backend databases + that can be efficiently searched for past messages. + + It is a addition to the key value storage already present in rbot + through its registry subsystem. + +=end + + class InvalidJournalMessage < StandardError + end + class StorageError < StandardError + end + + class JournalMessage + # a unique identification of this message + attr_reader :id + + # describes a hierarchical queue into which this message belongs + attr_reader :topic + + # when this message was published as a Time instance + attr_reader :timestamp + + # contains the actual message as a Hash + attr_reader :payload + + def initialize(message) + @id = message[:id] + @timestamp = message[:timestamp] + @topic = message[:topic] + @payload = message[:payload] + if @payload.class != Hash + raise InvalidJournalMessage.new('payload must be a hash!') + end + end + + # Access payload value by key. + def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? + value = pkey.split('.').reduce(@payload) do |hash, key| + if hash.has_key?(key) or hash.has_key?(key.to_sym) + hash[key] || hash[key.to_sym] + else + if default == :exception + raise ArgumentError.new + else + default + end + end + end + end + + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + + def ==(other) + (@id == other.id) rescue false + end + + def self.create(topic, payload, opt={}) + # cleanup payload to only contain strings + JournalMessage.new( + id: opt[:id] || SecureRandom.uuid, + timestamp: opt[:timestamp] || Time.now, + topic: topic, + payload: payload + ) + end + end + + module Storage + class AbstractStorage + # intializes/opens a new storage connection + def initialize(opts={}) + end + + # inserts a message in storage + def insert(message) + end + + # creates/ensures a index exists on the payload specified by key + def ensure_payload_index(key) + end + + # returns a array of message instances that match the query + def find(query=nil, limit=100, offset=0, &block) + end + + # returns the number of messages that match the query + def count(query=nil) + end + + # remove messages that match the query + def remove(query=nil) + end + + # destroy the underlying table/collection + def drop + end + + # Returns all classes from the namespace that implement this interface + def self.get_impl + ObjectSpace.each_object(Class).select { |klass| klass < self } + end + end + + def self.create(name, uri) + log 'load journal storage adapter: ' + name + load File.join(File.dirname(__FILE__), 'journal', name + '.rb') + cls = AbstractStorage.get_impl.first + cls.new(uri: uri) + end + end + + # Describes a query on journal entries, it is used both to describe + # a subscription aswell as to query persisted messages. + # There two ways to declare a Query instance, using + # the DSL like this: + # + # Query.define do + # id 'foo' + # id 'bar' + # topic 'log.irc.*' + # topic 'log.core' + # timestamp from: Time.now, to: Time.now + 60 * 10 + # payload 'action': :privmsg + # payload 'channel': '#rbot' + # payload 'foo.bar': 'baz' + # end + # + # or using a hash: (NOTE: avoid using symbols in payload) + # + # Query.define({ + # id: ['foo', 'bar'], + # topic: ['log.irc.*', 'log.core'], + # timestamp: { + # from: Time.now + # to: Time.now + 60 * 10 + # }, + # payload: { + # 'action' => 'privmsg' + # 'channel' => '#rbot', + # 'foo.bar' => 'baz' + # } + # }) + # + class Query + # array of ids to match (OR) + attr_reader :id + # array of topics to match with wildcard support (OR) + attr_reader :topic + # hash with from: timestamp and to: timestamp + attr_reader :timestamp + # hash of key values to match + attr_reader :payload + + def initialize(query) + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} + end + + # returns true if the given message matches the query + def matches?(message) + return false if not @id.empty? and not @id.include? message.id + return false if not @topic.empty? and not topic_matches? message.topic + if @timestamp[:from] + return false unless message.timestamp >= @timestamp[:from] + end + if @timestamp[:to] + return false unless message.timestamp <= @timestamp[:to] + end + found = false + @payload.each_pair do |key, value| + begin + message.get(key.to_s) + rescue ArgumentError + end + found = true + end + return false if not found and not @payload.empty? + true + end + + def topic_matches?(_topic) + @topic.each do |topic| + if topic.include? '*' + match = true + topic.split('.').zip(_topic.split('.')).each do |a, b| + if a == '*' + if not b or b.empty? + match = false + end + else + match = false unless a == b + end + end + return true if match + else + return true if topic == _topic + end + end + return false + end + + # factory that constructs a query + class Factory + attr_reader :query + def initialize + @query = { + id: [], + topic: [], + timestamp: { + from: nil, to: nil + }, + payload: {} + } + end + + def id(*_id) + @query[:id] += _id + end + + def topic(*_topic) + @query[:topic] += _topic + end + + def timestamp(range) + @query[:timestamp] = range + end + + def payload(query) + @query[:payload].merge!(query) + end + end + + def self.define(query=nil, &block) + factory = Factory.new + if block_given? + factory.instance_eval(&block) + query = factory.query + end + Query.new query + end + + end + + + class JournalBroker + attr_reader :storage + class Subscription + attr_reader :topic + attr_reader :block + def initialize(broker, topic, block) + @broker = broker + @topic = topic + @block = block + end + def cancel + @broker.unsubscribe(self) + end + end + + def initialize(opts={}) + # overrides the internal consumer with a block + @consumer = opts[:consumer] + # storage backend + @storage = opts[:storage] + unless @storage + warning 'journal broker: no storage set up, won\'t persist messages' + end + @queue = Queue.new + # consumer thread: + @thread = Thread.new do + while message = @queue.pop + begin + consume message + # pop(true) ... rescue ThreadError => e + rescue Exception => e + error 'journal broker: exception in consumer thread' + error $! + end + end + end + @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} + end + + def consume(message) + return unless message + @consumer.call(message) if @consumer + + # notify subscribers + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| + s.block.call(message) + end + end + + @storage.insert(message) if @storage + end + + def persists? + true if @storage + end + + def shutdown + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil + end + + def publish(topic, payload) + debug 'journal publish message in %s: %s' % [topic, payload.inspect] + @queue << JournalMessage::create(topic, payload) + nil + end + + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) + raise ArgumentError.new unless block_given? + s = Subscription.new(self, topic, block) + @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s + s + end + + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s + end + + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end + if block_given? + @storage.find(query, limit, offset, &block) + else + @storage.find(query, limit, offset) + end + end + + def count(query=nil) + unless query.is_a? Query + query = Query.define(query) + end + @storage.count(query) + end + + def remove(query=nil) + unless query.is_a? Query + query = Query.define(query) + end + @storage.remove(query) + end + + def ensure_payload_index(key) + @storage.ensure_payload_index(key) + end + + end + +end # Journal +end # Bot +end # Irc + |