]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
introducing a persistent message queue, the "journal"
authorMatthias Hecker <apoc@geekosphere.org>
Sat, 13 Jun 2015 22:18:35 +0000 (00:18 +0200)
committerMatthias Hecker <apoc@geekosphere.org>
Sat, 13 Jun 2015 22:18:35 +0000 (00:18 +0200)
lib/rbot/journal.rb [new file with mode: 0644]
test/test_journal.rb [new file with mode: 0644]

diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
new file mode 100644 (file)
index 0000000..4cab11c
--- /dev/null
@@ -0,0 +1,274 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot's persistent message queue
+
+require 'thread'
+require 'securerandom'
+
+module Irc
+class Bot
+module Journal
+
+=begin rdoc
+
+  The journal is a persistent message queue for rbot, its based on a basic
+  publish/subscribe model and persists messages into backend databases
+  that can be efficiently searched for past messages.
+
+  It is a addition to the key value storage already present in rbot
+  through its registry subsystem.
+
+=end
+
+  class InvalidJournalMessage < StandardError
+  end
+  class ConsumeInterrupt < StandardError
+  end
+
+  class JournalMessage
+    # a unique identification of this message
+    attr_reader :id
+
+    # describes a hierarchical queue into which this message belongs
+    attr_reader :topic
+
+    # when this message was published as a Time instance
+    attr_reader :timestamp
+
+    # contains the actual message as a Hash
+    attr_reader :payload
+
+    def initialize(message)
+      @id = message[:id]
+      @timestamp = message[:timestamp]
+      @topic = message[:topic]
+      @payload = message[:payload]
+      if @payload.class != Hash
+        raise InvalidJournalMessage.new('payload must be a hash!')
+      end
+    end
+
+    def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
+      value = pkey.split('.').reduce(@payload) do |hash, key|
+        if hash.has_key?(key) or hash.has_key?(key.to_sym)
+          hash[key] || hash[key.to_sym]
+        else
+          if default == :exception
+            raise ArgumentError.new
+          else
+            default
+          end
+        end
+      end
+    end
+
+    def self.create(topic, payload)
+      JournalMessage.new(
+        id: SecureRandom.uuid,
+        timestamp: Time.now,
+        topic: topic,
+        payload: payload
+      )
+    end
+  end
+
+  # Describes a query on journal entries, it is used both to describe
+  # a subscription aswell as to query persisted messages.
+  # There two ways to declare a Query instance, using
+  # the DSL like this:
+  #
+  #   Query.define do
+  #     id 'foo'
+  #     id 'bar'
+  #     topic 'log.irc.*'
+  #     topic 'log.core'
+  #     timestamp from: Time.now, to: Time.now + 60 * 10
+  #     payload 'action': :privmsg
+  #     payload 'channel': '#rbot'
+  #     payload 'foo.bar': 'baz'
+  #   end
+  #
+  # or using a hash: (NOTE: avoid using symbols in payload)
+  #
+  #   Query.define({
+  #     id: ['foo', 'bar'],
+  #     topic: ['log.irc.*', 'log.core'],
+  #     timestamp: {
+  #       from: Time.now
+  #       to: Time.now + 60 * 10
+  #     },
+  #     payload: {
+  #       'action' => 'privmsg'
+  #       'channel' => '#rbot',
+  #       'foo.bar' => 'baz'
+  #     }
+  #   })
+  #
+  class Query
+    # array of ids to match (OR)
+    attr_reader :id
+    # array of topics to match with wildcard support (OR)
+    attr_reader :topic
+    # hash with from: timestamp and to: timestamp
+    attr_reader :timestamp
+    # hash of key values to match
+    attr_reader :payload
+
+    def initialize(query)
+      @id = query[:id]
+      @topic = query[:topic]
+      @timestamp = query[:timestamp]
+      @payload = query[:payload]
+    end
+
+    # returns true if the given message matches the query
+    def matches?(message)
+      return false if not @id.empty? and not @id.include? message.id
+      return false if not @topic.empty? and not topic_matches? message.topic
+      if @timestamp[:from]
+        return false unless message.timestamp >= @timestamp[:from]
+      end
+      if @timestamp[:to]
+        return false unless message.timestamp <= @timestamp[:to]
+      end
+      found = false
+      @payload.each_pair do |key, value|
+        begin
+          message.get(key.to_s)
+        rescue ArgumentError
+        end
+        found = true
+      end
+      return false if not found and not @payload.empty?
+      true
+    end
+
+    def topic_matches?(_topic)
+      @topic.each do |topic|
+        if topic.include? '*'
+          match = true
+          topic.split('.').zip(_topic.split('.')).each do |a, b|
+            if a == '*'
+              if not b or b.empty?
+                match = false
+              end
+            else
+              match = false unless a == b
+            end
+          end
+          return true if match
+        else
+          return true if topic == _topic
+        end
+      end
+      return false
+    end
+
+    # factory that constructs a query
+    class Factory
+      attr_reader :query
+      def initialize
+        @query = {
+          id: [],
+          topic: [],
+          timestamp: {
+            from: nil, to: nil
+          },
+          payload: {}
+        }
+      end
+
+      def id(*_id)
+        @query[:id] += _id
+      end
+
+      def topic(*_topic)
+          @query[:topic] += _topic
+      end
+
+      def timestamp(range)
+        @query[:timestamp] = range
+      end
+
+      def payload(query)
+        @query[:payload].merge!(query)
+      end
+    end
+
+    def self.define(query=nil, &block)
+      factory = Factory.new
+      if block_given?
+        factory.instance_eval(&block)
+        query = factory.query
+      end
+      Query.new query
+    end
+
+  end
+
+
+
+  class JournalBroker
+    def initialize(opts={})
+      # overrides the internal consumer with a block
+      @consumer = opts[:consumer]
+      @queue = Queue.new
+      # consumer thread:
+      @thread = Thread.new do
+        loop do
+          begin
+            consume @queue.pop
+          # pop(true) ... rescue ThreadError => e
+          rescue ConsumeInterrupt => e
+            error 'journal broker: stop thread, consume interrupt raised'
+            break
+          rescue Exception => e
+            error 'journal broker: exception in consumer thread'
+            error $!
+          end
+        end
+      end
+      # TODO: this is a first naive implementation, later we do the
+      #       message/query matching for incoming messages more efficiently
+      @subscriptions = []
+    end
+
+    def consume(message)
+      return unless message
+      @consumer.call(message) if @consumer
+
+      # notify subscribers
+      @subscriptions.each do |query, block|
+        if query.matches? message
+          block.call(message)
+        end
+      end
+    end
+
+    def join
+      @thread.join
+    end
+
+    def shutdown
+      @thread.raise ConsumeInterrupt.new
+    end
+
+
+    def publish(topic, payload)
+      @queue.push JournalMessage::create(topic, payload)
+    end
+
+    # subscribe to messages that match the given query
+    def subscribe(query, &block)
+      raise ArgumentError.new unless block_given?
+      @subscriptions << [query, block]
+    end
+
+  end
+
+end # Journal
+end # Bot
+end # Irc
+
diff --git a/test/test_journal.rb b/test/test_journal.rb
new file mode 100644 (file)
index 0000000..d7a70a7
--- /dev/null
@@ -0,0 +1,178 @@
+$:.unshift File.join(File.dirname(__FILE__), '../lib')
+
+require 'test/unit'
+require 'rbot/ircbot'
+require 'rbot/journal'
+
+class JournalMessageTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_get
+    m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}})
+    assert_equal(42, m.get('bar'))
+    assert_raise ArgumentError do
+      m.get('nope')
+    end
+    assert_nil(m.get('nope', nil))
+    assert_nil(m.get('baz'))
+    assert_equal(23, m.get('qux.quxx'))
+  end
+
+end
+
+class QueryTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_define
+
+    q = Query.define do
+      id 'foo'
+      id 'bar', 'baz'
+      topic 'log.irc.*'
+      topic 'log.core', 'baz'
+      timestamp from: Time.now, to: Time.now + 60 * 10
+      payload 'action': :privmsg, 'alice': 'bob'
+      payload 'channel': '#rbot'
+      payload 'foo.bar': 'baz'
+    end
+    assert_equal(['foo', 'bar', 'baz'], q.id)
+    assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic)
+    assert_equal([:from, :to], q.timestamp.keys)
+    assert_equal(Time, q.timestamp[:to].class)
+    assert_equal(Time, q.timestamp[:from].class)
+    assert_equal({
+      'action': :privmsg, 'alice': 'bob',
+      'channel': '#rbot',
+      'foo.bar': 'baz'
+    }, q.payload)
+
+  end
+
+  def test_topic_matches
+    q = Query.define do
+      topic 'foo'
+    end
+    assert_true(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_false(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo.bar'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo.*'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+    assert_true(q.topic_matches?('foo.baz'))
+
+    q = Query.define do
+      topic '*.bar'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+    assert_true(q.topic_matches?('bar.bar'))
+    assert_false(q.topic_matches?('foo.foo'))
+
+    q = Query.define do
+      topic '*.*'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_true(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo'
+      topic 'bar'
+      topic 'baz.alice.bob.*.foo'
+    end
+    assert_true(q.topic_matches?('foo'))
+    assert_true(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('baz.alice.bob.asdf.foo'))
+    assert_false(q.topic_matches?('baz.alice.bob..foo'))
+
+  end
+
+  DAY=60*60*24
+  def test_matches
+    q = Query.define do
+      #id 'foo', 'bar'
+      topic 'log.irc.*', 'log.core'
+      timestamp from: Time.now - DAY, to: Time.now + DAY
+      payload 'action': 'privmsg', 'foo.bar': 'baz'
+    end
+    assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'}))
+    assert_false(q.matches? JournalMessage.create('baz', {}))
+    assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}}))
+
+    # tests timestamp from/to:
+    assert_true(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now,
+      payload: {action: 'privmsg'}))
+    assert_false(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now - DAY*3,
+      payload: {action: 'privmsg'}))
+    assert_false(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now + DAY*3,
+      payload: {action: 'privmsg'}))
+  end
+
+end
+
+class JournalBrokerTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_publish
+    received = []
+    journal = JournalBroker.new(consumer: Proc.new { |message|
+      received << message
+    })
+
+    # publish some messages:
+    journal.publish 'log.irc',
+      source: 'alice', message: '<3 pg'
+    journal.publish 'log.irc',
+      source: 'bob', message: 'mysql > pg'
+    journal.publish 'log.irc',
+      source: 'alice', target: 'bob', action: :kick
+
+    # wait for messages to be consumed:
+    sleep 0.1
+    assert_equal(3, received.length)
+  end
+
+  def test_subscribe
+    received = []
+    journal = JournalBroker.new
+
+    # subscribe to messages:
+    journal.subscribe(Query.define { topic 'foo' }) do |message|
+      received << message
+    end
+
+    # publish some messages:
+    journal.publish 'foo', {}
+    journal.publish 'bar', {}
+    journal.publish 'foo', {}
+
+    # wait for messages to be consumed:
+    sleep 0.1
+    assert_equal(2, received.length)
+  end
+
+end
+