From 613750983b43546d8bd2732c9159ff766a1c42bd Mon Sep 17 00:00:00 2001 From: Matthias Hecker Date: Sun, 14 Jun 2015 00:18:35 +0200 Subject: [PATCH] introducing a persistent message queue, the "journal" --- lib/rbot/journal.rb | 274 +++++++++++++++++++++++++++++++++++++++++++ test/test_journal.rb | 178 ++++++++++++++++++++++++++++ 2 files changed, 452 insertions(+) create mode 100644 lib/rbot/journal.rb create mode 100644 test/test_journal.rb diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb new file mode 100644 index 00000000..4cab11c8 --- /dev/null +++ b/lib/rbot/journal.rb @@ -0,0 +1,274 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: rbot's persistent message queue + +require 'thread' +require 'securerandom' + +module Irc +class Bot +module Journal + +=begin rdoc + + The journal is a persistent message queue for rbot, its based on a basic + publish/subscribe model and persists messages into backend databases + that can be efficiently searched for past messages. + + It is a addition to the key value storage already present in rbot + through its registry subsystem. + +=end + + class InvalidJournalMessage < StandardError + end + class ConsumeInterrupt < StandardError + end + + class JournalMessage + # a unique identification of this message + attr_reader :id + + # describes a hierarchical queue into which this message belongs + attr_reader :topic + + # when this message was published as a Time instance + attr_reader :timestamp + + # contains the actual message as a Hash + attr_reader :payload + + def initialize(message) + @id = message[:id] + @timestamp = message[:timestamp] + @topic = message[:topic] + @payload = message[:payload] + if @payload.class != Hash + raise InvalidJournalMessage.new('payload must be a hash!') + end + end + + def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? + value = pkey.split('.').reduce(@payload) do |hash, key| + if hash.has_key?(key) or hash.has_key?(key.to_sym) + hash[key] || hash[key.to_sym] + else + if default == :exception + raise ArgumentError.new + else + default + end + end + end + end + + def self.create(topic, payload) + JournalMessage.new( + id: SecureRandom.uuid, + timestamp: Time.now, + topic: topic, + payload: payload + ) + end + end + + # Describes a query on journal entries, it is used both to describe + # a subscription aswell as to query persisted messages. + # There two ways to declare a Query instance, using + # the DSL like this: + # + # Query.define do + # id 'foo' + # id 'bar' + # topic 'log.irc.*' + # topic 'log.core' + # timestamp from: Time.now, to: Time.now + 60 * 10 + # payload 'action': :privmsg + # payload 'channel': '#rbot' + # payload 'foo.bar': 'baz' + # end + # + # or using a hash: (NOTE: avoid using symbols in payload) + # + # Query.define({ + # id: ['foo', 'bar'], + # topic: ['log.irc.*', 'log.core'], + # timestamp: { + # from: Time.now + # to: Time.now + 60 * 10 + # }, + # payload: { + # 'action' => 'privmsg' + # 'channel' => '#rbot', + # 'foo.bar' => 'baz' + # } + # }) + # + class Query + # array of ids to match (OR) + attr_reader :id + # array of topics to match with wildcard support (OR) + attr_reader :topic + # hash with from: timestamp and to: timestamp + attr_reader :timestamp + # hash of key values to match + attr_reader :payload + + def initialize(query) + @id = query[:id] + @topic = query[:topic] + @timestamp = query[:timestamp] + @payload = query[:payload] + end + + # returns true if the given message matches the query + def matches?(message) + return false if not @id.empty? and not @id.include? message.id + return false if not @topic.empty? and not topic_matches? message.topic + if @timestamp[:from] + return false unless message.timestamp >= @timestamp[:from] + end + if @timestamp[:to] + return false unless message.timestamp <= @timestamp[:to] + end + found = false + @payload.each_pair do |key, value| + begin + message.get(key.to_s) + rescue ArgumentError + end + found = true + end + return false if not found and not @payload.empty? + true + end + + def topic_matches?(_topic) + @topic.each do |topic| + if topic.include? '*' + match = true + topic.split('.').zip(_topic.split('.')).each do |a, b| + if a == '*' + if not b or b.empty? + match = false + end + else + match = false unless a == b + end + end + return true if match + else + return true if topic == _topic + end + end + return false + end + + # factory that constructs a query + class Factory + attr_reader :query + def initialize + @query = { + id: [], + topic: [], + timestamp: { + from: nil, to: nil + }, + payload: {} + } + end + + def id(*_id) + @query[:id] += _id + end + + def topic(*_topic) + @query[:topic] += _topic + end + + def timestamp(range) + @query[:timestamp] = range + end + + def payload(query) + @query[:payload].merge!(query) + end + end + + def self.define(query=nil, &block) + factory = Factory.new + if block_given? + factory.instance_eval(&block) + query = factory.query + end + Query.new query + end + + end + + + + class JournalBroker + def initialize(opts={}) + # overrides the internal consumer with a block + @consumer = opts[:consumer] + @queue = Queue.new + # consumer thread: + @thread = Thread.new do + loop do + begin + consume @queue.pop + # pop(true) ... rescue ThreadError => e + rescue ConsumeInterrupt => e + error 'journal broker: stop thread, consume interrupt raised' + break + rescue Exception => e + error 'journal broker: exception in consumer thread' + error $! + end + end + end + # TODO: this is a first naive implementation, later we do the + # message/query matching for incoming messages more efficiently + @subscriptions = [] + end + + def consume(message) + return unless message + @consumer.call(message) if @consumer + + # notify subscribers + @subscriptions.each do |query, block| + if query.matches? message + block.call(message) + end + end + end + + def join + @thread.join + end + + def shutdown + @thread.raise ConsumeInterrupt.new + end + + + def publish(topic, payload) + @queue.push JournalMessage::create(topic, payload) + end + + # subscribe to messages that match the given query + def subscribe(query, &block) + raise ArgumentError.new unless block_given? + @subscriptions << [query, block] + end + + end + +end # Journal +end # Bot +end # Irc + diff --git a/test/test_journal.rb b/test/test_journal.rb new file mode 100644 index 00000000..d7a70a7c --- /dev/null +++ b/test/test_journal.rb @@ -0,0 +1,178 @@ +$:.unshift File.join(File.dirname(__FILE__), '../lib') + +require 'test/unit' +require 'rbot/ircbot' +require 'rbot/journal' + +class JournalMessageTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_get + m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}}) + assert_equal(42, m.get('bar')) + assert_raise ArgumentError do + m.get('nope') + end + assert_nil(m.get('nope', nil)) + assert_nil(m.get('baz')) + assert_equal(23, m.get('qux.quxx')) + end + +end + +class QueryTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_define + + q = Query.define do + id 'foo' + id 'bar', 'baz' + topic 'log.irc.*' + topic 'log.core', 'baz' + timestamp from: Time.now, to: Time.now + 60 * 10 + payload 'action': :privmsg, 'alice': 'bob' + payload 'channel': '#rbot' + payload 'foo.bar': 'baz' + end + assert_equal(['foo', 'bar', 'baz'], q.id) + assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic) + assert_equal([:from, :to], q.timestamp.keys) + assert_equal(Time, q.timestamp[:to].class) + assert_equal(Time, q.timestamp[:from].class) + assert_equal({ + 'action': :privmsg, 'alice': 'bob', + 'channel': '#rbot', + 'foo.bar': 'baz' + }, q.payload) + + end + + def test_topic_matches + q = Query.define do + topic 'foo' + end + assert_true(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_false(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo.bar' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo.*' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + assert_true(q.topic_matches?('foo.baz')) + + q = Query.define do + topic '*.bar' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + assert_true(q.topic_matches?('bar.bar')) + assert_false(q.topic_matches?('foo.foo')) + + q = Query.define do + topic '*.*' + end + assert_false(q.topic_matches?('foo')) + assert_true(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo' + topic 'bar' + topic 'baz.alice.bob.*.foo' + end + assert_true(q.topic_matches?('foo')) + assert_true(q.topic_matches?('bar')) + assert_true(q.topic_matches?('baz.alice.bob.asdf.foo')) + assert_false(q.topic_matches?('baz.alice.bob..foo')) + + end + + DAY=60*60*24 + def test_matches + q = Query.define do + #id 'foo', 'bar' + topic 'log.irc.*', 'log.core' + timestamp from: Time.now - DAY, to: Time.now + DAY + payload 'action': 'privmsg', 'foo.bar': 'baz' + end + assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'})) + assert_false(q.matches? JournalMessage.create('baz', {})) + assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}})) + + # tests timestamp from/to: + assert_true(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now, + payload: {action: 'privmsg'})) + assert_false(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now - DAY*3, + payload: {action: 'privmsg'})) + assert_false(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now + DAY*3, + payload: {action: 'privmsg'})) + end + +end + +class JournalBrokerTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_publish + received = [] + journal = JournalBroker.new(consumer: Proc.new { |message| + received << message + }) + + # publish some messages: + journal.publish 'log.irc', + source: 'alice', message: '<3 pg' + journal.publish 'log.irc', + source: 'bob', message: 'mysql > pg' + journal.publish 'log.irc', + source: 'alice', target: 'bob', action: :kick + + # wait for messages to be consumed: + sleep 0.1 + assert_equal(3, received.length) + end + + def test_subscribe + received = [] + journal = JournalBroker.new + + # subscribe to messages: + journal.subscribe(Query.define { topic 'foo' }) do |message| + received << message + end + + # publish some messages: + journal.publish 'foo', {} + journal.publish 'bar', {} + journal.publish 'foo', {} + + # wait for messages to be consumed: + sleep 0.1 + assert_equal(2, received.length) + end + +end + -- 2.39.2