summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/rbot/journal.rb274
-rw-r--r--test/test_journal.rb178
2 files changed, 452 insertions, 0 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
new file mode 100644
index 00000000..4cab11c8
--- /dev/null
+++ b/lib/rbot/journal.rb
@@ -0,0 +1,274 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot's persistent message queue
+
+require 'thread'
+require 'securerandom'
+
+module Irc
+class Bot
+module Journal
+
+=begin rdoc
+
+ The journal is a persistent message queue for rbot, its based on a basic
+ publish/subscribe model and persists messages into backend databases
+ that can be efficiently searched for past messages.
+
+ It is a addition to the key value storage already present in rbot
+ through its registry subsystem.
+
+=end
+
+ class InvalidJournalMessage < StandardError
+ end
+ class ConsumeInterrupt < StandardError
+ end
+
+ class JournalMessage
+ # a unique identification of this message
+ attr_reader :id
+
+ # describes a hierarchical queue into which this message belongs
+ attr_reader :topic
+
+ # when this message was published as a Time instance
+ attr_reader :timestamp
+
+ # contains the actual message as a Hash
+ attr_reader :payload
+
+ def initialize(message)
+ @id = message[:id]
+ @timestamp = message[:timestamp]
+ @topic = message[:topic]
+ @payload = message[:payload]
+ if @payload.class != Hash
+ raise InvalidJournalMessage.new('payload must be a hash!')
+ end
+ end
+
+ def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
+ value = pkey.split('.').reduce(@payload) do |hash, key|
+ if hash.has_key?(key) or hash.has_key?(key.to_sym)
+ hash[key] || hash[key.to_sym]
+ else
+ if default == :exception
+ raise ArgumentError.new
+ else
+ default
+ end
+ end
+ end
+ end
+
+ def self.create(topic, payload)
+ JournalMessage.new(
+ id: SecureRandom.uuid,
+ timestamp: Time.now,
+ topic: topic,
+ payload: payload
+ )
+ end
+ end
+
+ # Describes a query on journal entries, it is used both to describe
+ # a subscription aswell as to query persisted messages.
+ # There two ways to declare a Query instance, using
+ # the DSL like this:
+ #
+ # Query.define do
+ # id 'foo'
+ # id 'bar'
+ # topic 'log.irc.*'
+ # topic 'log.core'
+ # timestamp from: Time.now, to: Time.now + 60 * 10
+ # payload 'action': :privmsg
+ # payload 'channel': '#rbot'
+ # payload 'foo.bar': 'baz'
+ # end
+ #
+ # or using a hash: (NOTE: avoid using symbols in payload)
+ #
+ # Query.define({
+ # id: ['foo', 'bar'],
+ # topic: ['log.irc.*', 'log.core'],
+ # timestamp: {
+ # from: Time.now
+ # to: Time.now + 60 * 10
+ # },
+ # payload: {
+ # 'action' => 'privmsg'
+ # 'channel' => '#rbot',
+ # 'foo.bar' => 'baz'
+ # }
+ # })
+ #
+ class Query
+ # array of ids to match (OR)
+ attr_reader :id
+ # array of topics to match with wildcard support (OR)
+ attr_reader :topic
+ # hash with from: timestamp and to: timestamp
+ attr_reader :timestamp
+ # hash of key values to match
+ attr_reader :payload
+
+ def initialize(query)
+ @id = query[:id]
+ @topic = query[:topic]
+ @timestamp = query[:timestamp]
+ @payload = query[:payload]
+ end
+
+ # returns true if the given message matches the query
+ def matches?(message)
+ return false if not @id.empty? and not @id.include? message.id
+ return false if not @topic.empty? and not topic_matches? message.topic
+ if @timestamp[:from]
+ return false unless message.timestamp >= @timestamp[:from]
+ end
+ if @timestamp[:to]
+ return false unless message.timestamp <= @timestamp[:to]
+ end
+ found = false
+ @payload.each_pair do |key, value|
+ begin
+ message.get(key.to_s)
+ rescue ArgumentError
+ end
+ found = true
+ end
+ return false if not found and not @payload.empty?
+ true
+ end
+
+ def topic_matches?(_topic)
+ @topic.each do |topic|
+ if topic.include? '*'
+ match = true
+ topic.split('.').zip(_topic.split('.')).each do |a, b|
+ if a == '*'
+ if not b or b.empty?
+ match = false
+ end
+ else
+ match = false unless a == b
+ end
+ end
+ return true if match
+ else
+ return true if topic == _topic
+ end
+ end
+ return false
+ end
+
+ # factory that constructs a query
+ class Factory
+ attr_reader :query
+ def initialize
+ @query = {
+ id: [],
+ topic: [],
+ timestamp: {
+ from: nil, to: nil
+ },
+ payload: {}
+ }
+ end
+
+ def id(*_id)
+ @query[:id] += _id
+ end
+
+ def topic(*_topic)
+ @query[:topic] += _topic
+ end
+
+ def timestamp(range)
+ @query[:timestamp] = range
+ end
+
+ def payload(query)
+ @query[:payload].merge!(query)
+ end
+ end
+
+ def self.define(query=nil, &block)
+ factory = Factory.new
+ if block_given?
+ factory.instance_eval(&block)
+ query = factory.query
+ end
+ Query.new query
+ end
+
+ end
+
+
+
+ class JournalBroker
+ def initialize(opts={})
+ # overrides the internal consumer with a block
+ @consumer = opts[:consumer]
+ @queue = Queue.new
+ # consumer thread:
+ @thread = Thread.new do
+ loop do
+ begin
+ consume @queue.pop
+ # pop(true) ... rescue ThreadError => e
+ rescue ConsumeInterrupt => e
+ error 'journal broker: stop thread, consume interrupt raised'
+ break
+ rescue Exception => e
+ error 'journal broker: exception in consumer thread'
+ error $!
+ end
+ end
+ end
+ # TODO: this is a first naive implementation, later we do the
+ # message/query matching for incoming messages more efficiently
+ @subscriptions = []
+ end
+
+ def consume(message)
+ return unless message
+ @consumer.call(message) if @consumer
+
+ # notify subscribers
+ @subscriptions.each do |query, block|
+ if query.matches? message
+ block.call(message)
+ end
+ end
+ end
+
+ def join
+ @thread.join
+ end
+
+ def shutdown
+ @thread.raise ConsumeInterrupt.new
+ end
+
+
+ def publish(topic, payload)
+ @queue.push JournalMessage::create(topic, payload)
+ end
+
+ # subscribe to messages that match the given query
+ def subscribe(query, &block)
+ raise ArgumentError.new unless block_given?
+ @subscriptions << [query, block]
+ end
+
+ end
+
+end # Journal
+end # Bot
+end # Irc
+
diff --git a/test/test_journal.rb b/test/test_journal.rb
new file mode 100644
index 00000000..d7a70a7c
--- /dev/null
+++ b/test/test_journal.rb
@@ -0,0 +1,178 @@
+$:.unshift File.join(File.dirname(__FILE__), '../lib')
+
+require 'test/unit'
+require 'rbot/ircbot'
+require 'rbot/journal'
+
+class JournalMessageTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_get
+ m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}})
+ assert_equal(42, m.get('bar'))
+ assert_raise ArgumentError do
+ m.get('nope')
+ end
+ assert_nil(m.get('nope', nil))
+ assert_nil(m.get('baz'))
+ assert_equal(23, m.get('qux.quxx'))
+ end
+
+end
+
+class QueryTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_define
+
+ q = Query.define do
+ id 'foo'
+ id 'bar', 'baz'
+ topic 'log.irc.*'
+ topic 'log.core', 'baz'
+ timestamp from: Time.now, to: Time.now + 60 * 10
+ payload 'action': :privmsg, 'alice': 'bob'
+ payload 'channel': '#rbot'
+ payload 'foo.bar': 'baz'
+ end
+ assert_equal(['foo', 'bar', 'baz'], q.id)
+ assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic)
+ assert_equal([:from, :to], q.timestamp.keys)
+ assert_equal(Time, q.timestamp[:to].class)
+ assert_equal(Time, q.timestamp[:from].class)
+ assert_equal({
+ 'action': :privmsg, 'alice': 'bob',
+ 'channel': '#rbot',
+ 'foo.bar': 'baz'
+ }, q.payload)
+
+ end
+
+ def test_topic_matches
+ q = Query.define do
+ topic 'foo'
+ end
+ assert_true(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_false(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo.bar'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo.*'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+ assert_true(q.topic_matches?('foo.baz'))
+
+ q = Query.define do
+ topic '*.bar'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+ assert_true(q.topic_matches?('bar.bar'))
+ assert_false(q.topic_matches?('foo.foo'))
+
+ q = Query.define do
+ topic '*.*'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_true(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo'
+ topic 'bar'
+ topic 'baz.alice.bob.*.foo'
+ end
+ assert_true(q.topic_matches?('foo'))
+ assert_true(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('baz.alice.bob.asdf.foo'))
+ assert_false(q.topic_matches?('baz.alice.bob..foo'))
+
+ end
+
+ DAY=60*60*24
+ def test_matches
+ q = Query.define do
+ #id 'foo', 'bar'
+ topic 'log.irc.*', 'log.core'
+ timestamp from: Time.now - DAY, to: Time.now + DAY
+ payload 'action': 'privmsg', 'foo.bar': 'baz'
+ end
+ assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'}))
+ assert_false(q.matches? JournalMessage.create('baz', {}))
+ assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}}))
+
+ # tests timestamp from/to:
+ assert_true(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now,
+ payload: {action: 'privmsg'}))
+ assert_false(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now - DAY*3,
+ payload: {action: 'privmsg'}))
+ assert_false(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now + DAY*3,
+ payload: {action: 'privmsg'}))
+ end
+
+end
+
+class JournalBrokerTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_publish
+ received = []
+ journal = JournalBroker.new(consumer: Proc.new { |message|
+ received << message
+ })
+
+ # publish some messages:
+ journal.publish 'log.irc',
+ source: 'alice', message: '<3 pg'
+ journal.publish 'log.irc',
+ source: 'bob', message: 'mysql > pg'
+ journal.publish 'log.irc',
+ source: 'alice', target: 'bob', action: :kick
+
+ # wait for messages to be consumed:
+ sleep 0.1
+ assert_equal(3, received.length)
+ end
+
+ def test_subscribe
+ received = []
+ journal = JournalBroker.new
+
+ # subscribe to messages:
+ journal.subscribe(Query.define { topic 'foo' }) do |message|
+ received << message
+ end
+
+ # publish some messages:
+ journal.publish 'foo', {}
+ journal.publish 'bar', {}
+ journal.publish 'foo', {}
+
+ # wait for messages to be consumed:
+ sleep 0.1
+ assert_equal(2, received.length)
+ end
+
+end
+