summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Hecker <apoc@geekosphere.org>2015-06-20 19:25:39 +0200
committerMatthias Hecker <apoc@geekosphere.org>2015-06-20 19:25:39 +0200
commit85bfb8405528b2da203291b5671f9077d5b62742 (patch)
tree1bb24a408ecf4b3cd383ced9818a91438884c651
parent257ba78e8564964b78f839cce45350b824c2f410 (diff)
journal: start with core botmodule, api changes
-rw-r--r--lib/rbot/core/journal.rb52
-rw-r--r--lib/rbot/ircbot.rb13
-rw-r--r--lib/rbot/journal.rb132
-rw-r--r--lib/rbot/journal/mongo.rb19
-rw-r--r--lib/rbot/journal/postgres.rb17
-rw-r--r--test/test_journal.rb69
6 files changed, 225 insertions, 77 deletions
diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb
new file mode 100644
index 00000000..f8a88620
--- /dev/null
+++ b/lib/rbot/core/journal.rb
@@ -0,0 +1,52 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot journal management from IRC
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'rbot/journal'
+
+class JournalModule < CoreBotModule
+
+ attr_reader :broker
+
+ include Irc::Bot::Journal
+
+ Config.register Config::StringValue.new('journal.storage',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage engine used by the journal')
+ Config.register Config::StringValue.new('journal.storage.uri',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage database uri')
+
+ def initialize
+ super
+ storage = nil
+ name = @bot.config['journal.storage']
+ uri = @bot.config['journal.storage.uri']
+ if name
+ storage = Storage.create(name, uri)
+ end
+ debug 'journal broker starting up...'
+ @broker = JournalBroker.new(storage: storage)
+ end
+
+ def cleanup
+ super
+ debug 'journal broker shutting down...'
+ @broker.shutdown
+ @broker = nil
+ end
+
+ def help(plugin, topic='')
+ 'journal'
+ end
+
+end
+
+journal = JournalModule.new
+journal.priority = -2
+
diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb
index caabc15d..46e4faaa 100644
--- a/lib/rbot/ircbot.rb
+++ b/lib/rbot/ircbot.rb
@@ -156,7 +156,6 @@ require 'rbot/registry'
require 'rbot/plugins'
require 'rbot/message'
require 'rbot/language'
-require 'rbot/journal'
module Irc
@@ -205,9 +204,6 @@ class Bot
# web service
attr_accessor :webservice
- # persistent message queue
- attr_accessor :journal
-
# server we are connected to
# TODO multiserver
def server
@@ -230,6 +226,13 @@ class Bot
myself.channels
end
+ # returns the journal
+ def journal
+ if @plugins['journal']
+ @plugins['journal'].broker
+ end
+ end
+
# nick wanted by the bot. This defaults to the irc.nick config value,
# but may be overridden by a manual !nick command
def wanted_nick
@@ -550,8 +553,6 @@ class Bot
log_session_start
- @journal = Journal::JournalBroker.new(bot: self)
-
if $daemonize
log "Redirecting standard input/output/error"
[$stdin, $stdout, $stderr].each do |fd|
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index 0b4324fe..c5bfcfea 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -3,6 +3,8 @@
#++
#
# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
require 'thread'
require 'securerandom'
@@ -22,19 +24,8 @@ module Journal
=end
- Config.register Config::StringValue.new('journal.storage',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage engine used by the journal')
- Config.register Config::StringValue.new('journal.storage.uri',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage database uri')
-
class InvalidJournalMessage < StandardError
end
- class ConsumeInterrupt < StandardError
- end
class StorageError < StandardError
end
@@ -61,6 +52,7 @@ module Journal
end
end
+ # Access payload value by key.
def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
value = pkey.split('.').reduce(@payload) do |hash, key|
if hash.has_key?(key) or hash.has_key?(key.to_sym)
@@ -75,6 +67,11 @@ module Journal
end
end
+ # Access payload value by key alias for get(key, nil).
+ def [](key)
+ get(key, nil)
+ end
+
def ==(other)
(@id == other.id) rescue false
end
@@ -104,7 +101,7 @@ module Journal
end
# returns a array of message instances that match the query
- def find(query=nil, limit=100, offset=0)
+ def find(query=nil, limit=100, offset=0, &block)
end
# returns the number of messages that match the query
@@ -176,10 +173,20 @@ module Journal
attr_reader :payload
def initialize(query)
- @id = query[:id]
- @topic = query[:topic]
- @timestamp = query[:timestamp]
- @payload = query[:payload]
+ @id = query[:id] || []
+ @id = [@id] if @id.is_a? String
+ @topic = query[:topic] || []
+ @topic = [@topic] if @topic.is_a? String
+ @timestamp = {
+ from: nil, to: nil
+ }
+ if query[:timestamp] and query[:timestamp][:from]
+ @timestamp[:from] = query[:timestamp][:from]
+ end
+ if query[:timestamp] and query[:timestamp][:to]
+ @timestamp[:to] = query[:timestamp][:to]
+ end
+ @payload = query[:payload] || {}
end
# returns true if the given message matches the query
@@ -270,11 +277,11 @@ module Journal
class JournalBroker
class Subscription
- attr_reader :query
+ attr_reader :topic
attr_reader :block
- def initialize(broker, query, block)
+ def initialize(broker, topic, block)
@broker = broker
- @query = query
+ @topic = topic
@block = block
end
def cancel
@@ -285,36 +292,27 @@ module Journal
def initialize(opts={})
# overrides the internal consumer with a block
@consumer = opts[:consumer]
- @bot = opts[:bot]
# storage backend
- if @bot
- @storage = opts[:storage] || Storage.create(
- @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
- else
- @storage = opts[:storage]
- end
+ @storage = opts[:storage]
unless @storage
warning 'journal broker: no storage set up, won\'t persist messages'
end
@queue = Queue.new
# consumer thread:
@thread = Thread.new do
- loop do
+ while message = @queue.pop
begin
- consume @queue.pop
+ consume message
# pop(true) ... rescue ThreadError => e
- rescue ConsumeInterrupt => e
- error 'journal broker: stop thread, consume interrupt raised'
- break
rescue Exception => e
error 'journal broker: exception in consumer thread'
error $!
end
end
end
- # TODO: this is a first naive implementation, later we do the
- # message/query matching for incoming messages more efficiently
@subscriptions = []
+ # lookup-table for subscriptions by their topic
+ @topic_subs = {}
end
def consume(message)
@@ -322,8 +320,8 @@ module Journal
@consumer.call(message) if @consumer
# notify subscribers
- @subscriptions.each do |s|
- if s.query.matches? message
+ if @topic_subs.has_key? message.topic
+ @topic_subs[message.topic].each do |s|
s.block.call(message)
end
end
@@ -335,36 +333,68 @@ module Journal
true if @storage
end
- def join
- @thread.join
- end
-
def shutdown
- @thread.raise ConsumeInterrupt.new
+ log 'journal shutdown'
+ @subscriptions.clear
+ @topic_subs.clear
+ @queue << nil
+ @thread.join
+ @thread = nil
end
def publish(topic, payload)
- @queue.push JournalMessage::create(topic, payload)
+ @queue << JournalMessage::create(topic, payload)
end
- # subscribe to messages that match the given query
- def subscribe(query, &block)
+ # Subscribe to receive messages from a topic.
+ #
+ # You can use this method to subscribe to messages that
+ # are published within a specified topic. You must provide
+ # a receiving block to receive messages one-by-one.
+ # The method returns an instance of Subscription that can
+ # be used to cancel the subscription by invoking cancel
+ # on it.
+ #
+ # journal.subscribe('irclog') do |message|
+ # # received irclog messages...
+ # end
+ #
+ def subscribe(topic=nil, &block)
raise ArgumentError.new unless block_given?
- s = Subscription.new(self, query, block)
+ s = Subscription.new(self, topic, block)
@subscriptions << s
+ unless @topic_subs.has_key? topic
+ @topic_subs[topic] = []
+ end
+ @topic_subs[topic] << s
s
end
- def unsubscribe(subscription)
- @subscriptions.delete subscription
+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
end
- def find(query=nil, limit=100, offset=0, &block)
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
if block_given?
- begin
- res = @storage.find(query, limit, offset)
- block.call(res)
- end until res.length > 0
+ @storage.find(query, limit, offset, &block)
else
@storage.find(query, limit, offset)
end
diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
index 24e9cfcc..2e735587 100644
--- a/lib/rbot/journal/mongo.rb
+++ b/lib/rbot/journal/mongo.rb
@@ -25,6 +25,7 @@ module Journal
drop if opts[:drop]
@collection.indexes.create_one({topic: 1})
+ @collection.indexes.create_one({timestamp: 1})
end
def ensure_index(key)
@@ -40,10 +41,20 @@ module Journal
})
end
- def find(query=nil, limit=100, offset=0)
- query_cursor(query).skip(offset).limit(limit).map do |document|
- JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime,
- topic: document['topic'], payload: document['payload'].to_h)
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(document)
+ JournalMessage.new(id: document['_id'],
+ timestamp: document['timestamp'].localtime,
+ topic: document['topic'],
+ payload: document['payload'].to_h)
+ end
+
+ cursor = query_cursor(query).skip(offset).limit(limit)
+
+ if block_given?
+ cursor.each { |document| block.call(to_message(document)) }
+ else
+ cursor.map { |document| to_message(document) }
end
end
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
index e63aefee..62590110 100644
--- a/lib/rbot/journal/postgres.rb
+++ b/lib/rbot/journal/postgres.rb
@@ -59,6 +59,7 @@ module Journal
drop if opts[:drop]
create_table
create_index('topic_index', 'topic')
+ create_index('timestamp_index', 'timestamp')
end
def create_table
@@ -92,7 +93,13 @@ module Journal
[m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
end
- def find(query=nil, limit=100, offset=0)
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(row)
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+
if query
sql, params = query_to_sql(query)
sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
@@ -101,10 +108,10 @@ module Journal
params = []
end
res = @conn.exec_params(sql, params)
- res.map do |row|
- timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
- JournalMessage.new(id: row['id'], timestamp: timestamp,
- topic: row['topic'], payload: JSON.parse(row['payload']))
+ if block_given?
+ res.each { |row| block.call(to_message(row)) }
+ else
+ res.map { |row| to_message(row) }
end
end
diff --git a/test/test_journal.rb b/test/test_journal.rb
index b9f5c612..37497351 100644
--- a/test/test_journal.rb
+++ b/test/test_journal.rb
@@ -22,7 +22,9 @@ class JournalMessageTest < Test::Unit::TestCase
end
assert_nil(m.get('nope', nil))
assert_nil(m.get('baz'))
- assert_equal(23, m.get('qux.quxx'))
+ assert_equal(23, m['qux.quxx'])
+ assert_equal(nil, m['qux.nope'])
+ assert_raise(ArgumentError) { m.get('qux.nope') }
end
end
@@ -163,8 +165,8 @@ class JournalBrokerTest < Test::Unit::TestCase
received = []
journal = JournalBroker.new
- # subscribe to messages:
- sub = journal.subscribe(Query.define { topic 'foo' }) do |message|
+ # subscribe to messages for topic foo:
+ sub = journal.subscribe('foo') do |message|
received << message
end
@@ -236,6 +238,31 @@ module JournalStorageTestMixin
assert_equal(m, @storage.find.first)
end
+ def test_find
+ # tests limit/offset and block parameters of find()
+ @storage.insert(JournalMessage.create('irclogs', {message: 'foo'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'bar'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'baz'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'qux'}))
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'})) do |m|
+ msgs << m
+ end
+ assert_equal(4, msgs.length)
+ assert_equal('foo', msgs.first['message'])
+ assert_equal('qux', msgs.last['message'])
+
+ end
+
def test_operations_multiple
# test operations on multiple messages
# insert a bunch:
@@ -269,15 +296,27 @@ module JournalStorageTestMixin
assert_equal(0, @storage.count)
end
- def test_journal
- # this journal persists messages in the test storage:
- journal = JournalBroker.new(storage: @storage)
- journal.publish 'log.irc', action: 'message'
+ def test_broker_interface
+ journal = JournalBroker.new(storage: @storage)
+
+ journal.publish 'irclogs', message: 'foo'
+ journal.publish 'irclogs', message: 'bar'
+ journal.publish 'irclogs', message: 'baz'
+ journal.publish 'irclogs', message: 'qux'
+
+ # wait for messages to be consumed:
sleep 0.1
- assert_equal(1, journal.count)
+
+ msgs = []
+ journal.find({topic: 'irclogs'}, 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
end
- NUM=150_000
+ NUM=100 # 1_000_000
def test_benchmark
puts
@@ -323,13 +362,14 @@ module JournalStorageTestMixin
end
+if ENV['PG_URI']
class JournalStoragePostgresTest < Test::Unit::TestCase
include JournalStorageTestMixin
def setup
@storage = Storage::PostgresStorage.new(
- uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+ uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal',
drop: true)
end
@@ -358,15 +398,22 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
end
end
+else
+ puts 'NOTE: Set PG_URI environment variable to test postgresql storage.'
+end
+if ENV['MONGO_URI']
class JournalStorageMongoTest < Test::Unit::TestCase
include JournalStorageTestMixin
def setup
@storage = Storage::MongoStorage.new(
+ uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot',
drop: true)
end
-
+end
+else
+ puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.'
end