From 85bfb8405528b2da203291b5671f9077d5b62742 Mon Sep 17 00:00:00 2001 From: Matthias Hecker Date: Sat, 20 Jun 2015 19:25:39 +0200 Subject: journal: start with core botmodule, api changes --- lib/rbot/core/journal.rb | 52 +++++++++++++++++ lib/rbot/ircbot.rb | 13 +++-- lib/rbot/journal.rb | 132 ++++++++++++++++++++++++++----------------- lib/rbot/journal/mongo.rb | 19 +++++-- lib/rbot/journal/postgres.rb | 17 ++++-- test/test_journal.rb | 69 ++++++++++++++++++---- 6 files changed, 225 insertions(+), 77 deletions(-) create mode 100644 lib/rbot/core/journal.rb diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb new file mode 100644 index 00000000..f8a88620 --- /dev/null +++ b/lib/rbot/core/journal.rb @@ -0,0 +1,52 @@ +#-- vim:sw=2:et +#++ +# +# :title: rbot journal management from IRC +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +require 'rbot/journal' + +class JournalModule < CoreBotModule + + attr_reader :broker + + include Irc::Bot::Journal + + Config.register Config::StringValue.new('journal.storage', + :default => nil, + :requires_rescan => true, + :desc => 'storage engine used by the journal') + Config.register Config::StringValue.new('journal.storage.uri', + :default => nil, + :requires_rescan => true, + :desc => 'storage database uri') + + def initialize + super + storage = nil + name = @bot.config['journal.storage'] + uri = @bot.config['journal.storage.uri'] + if name + storage = Storage.create(name, uri) + end + debug 'journal broker starting up...' + @broker = JournalBroker.new(storage: storage) + end + + def cleanup + super + debug 'journal broker shutting down...' + @broker.shutdown + @broker = nil + end + + def help(plugin, topic='') + 'journal' + end + +end + +journal = JournalModule.new +journal.priority = -2 + diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb index caabc15d..46e4faaa 100644 --- a/lib/rbot/ircbot.rb +++ b/lib/rbot/ircbot.rb @@ -156,7 +156,6 @@ require 'rbot/registry' require 'rbot/plugins' require 'rbot/message' require 'rbot/language' -require 'rbot/journal' module Irc @@ -205,9 +204,6 @@ class Bot # web service attr_accessor :webservice - # persistent message queue - attr_accessor :journal - # server we are connected to # TODO multiserver def server @@ -230,6 +226,13 @@ class Bot myself.channels end + # returns the journal + def journal + if @plugins['journal'] + @plugins['journal'].broker + end + end + # nick wanted by the bot. This defaults to the irc.nick config value, # but may be overridden by a manual !nick command def wanted_nick @@ -550,8 +553,6 @@ class Bot log_session_start - @journal = Journal::JournalBroker.new(bot: self) - if $daemonize log "Redirecting standard input/output/error" [$stdin, $stdout, $stderr].each do |fd| diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 0b4324fe..c5bfcfea 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -3,6 +3,8 @@ #++ # # :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) require 'thread' require 'securerandom' @@ -22,19 +24,8 @@ module Journal =end - Config.register Config::StringValue.new('journal.storage', - :default => nil, - :requires_restart => true, - :desc => 'storage engine used by the journal') - Config.register Config::StringValue.new('journal.storage.uri', - :default => nil, - :requires_restart => true, - :desc => 'storage database uri') - class InvalidJournalMessage < StandardError end - class ConsumeInterrupt < StandardError - end class StorageError < StandardError end @@ -61,6 +52,7 @@ module Journal end end + # Access payload value by key. def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? value = pkey.split('.').reduce(@payload) do |hash, key| if hash.has_key?(key) or hash.has_key?(key.to_sym) @@ -75,6 +67,11 @@ module Journal end end + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + def ==(other) (@id == other.id) rescue false end @@ -104,7 +101,7 @@ module Journal end # returns a array of message instances that match the query - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) end # returns the number of messages that match the query @@ -176,10 +173,20 @@ module Journal attr_reader :payload def initialize(query) - @id = query[:id] - @topic = query[:topic] - @timestamp = query[:timestamp] - @payload = query[:payload] + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} end # returns true if the given message matches the query @@ -270,11 +277,11 @@ module Journal class JournalBroker class Subscription - attr_reader :query + attr_reader :topic attr_reader :block - def initialize(broker, query, block) + def initialize(broker, topic, block) @broker = broker - @query = query + @topic = topic @block = block end def cancel @@ -285,36 +292,27 @@ module Journal def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] - @bot = opts[:bot] # storage backend - if @bot - @storage = opts[:storage] || Storage.create( - @bot.config['journal.storage'], @bot.config['journal.storage.uri']) - else - @storage = opts[:storage] - end + @storage = opts[:storage] unless @storage warning 'journal broker: no storage set up, won\'t persist messages' end @queue = Queue.new # consumer thread: @thread = Thread.new do - loop do + while message = @queue.pop begin - consume @queue.pop + consume message # pop(true) ... rescue ThreadError => e - rescue ConsumeInterrupt => e - error 'journal broker: stop thread, consume interrupt raised' - break rescue Exception => e error 'journal broker: exception in consumer thread' error $! end end end - # TODO: this is a first naive implementation, later we do the - # message/query matching for incoming messages more efficiently @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} end def consume(message) @@ -322,8 +320,8 @@ module Journal @consumer.call(message) if @consumer # notify subscribers - @subscriptions.each do |s| - if s.query.matches? message + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| s.block.call(message) end end @@ -335,36 +333,68 @@ module Journal true if @storage end - def join - @thread.join - end - def shutdown - @thread.raise ConsumeInterrupt.new + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil end def publish(topic, payload) - @queue.push JournalMessage::create(topic, payload) + @queue << JournalMessage::create(topic, payload) end - # subscribe to messages that match the given query - def subscribe(query, &block) + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) raise ArgumentError.new unless block_given? - s = Subscription.new(self, query, block) + s = Subscription.new(self, topic, block) @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s s end - def unsubscribe(subscription) - @subscriptions.delete subscription + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s end - def find(query=nil, limit=100, offset=0, &block) + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end if block_given? - begin - res = @storage.find(query, limit, offset) - block.call(res) - end until res.length > 0 + @storage.find(query, limit, offset, &block) else @storage.find(query, limit, offset) end diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb index 24e9cfcc..2e735587 100644 --- a/lib/rbot/journal/mongo.rb +++ b/lib/rbot/journal/mongo.rb @@ -25,6 +25,7 @@ module Journal drop if opts[:drop] @collection.indexes.create_one({topic: 1}) + @collection.indexes.create_one({timestamp: 1}) end def ensure_index(key) @@ -40,10 +41,20 @@ module Journal }) end - def find(query=nil, limit=100, offset=0) - query_cursor(query).skip(offset).limit(limit).map do |document| - JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime, - topic: document['topic'], payload: document['payload'].to_h) + def find(query=nil, limit=100, offset=0, &block) + def to_message(document) + JournalMessage.new(id: document['_id'], + timestamp: document['timestamp'].localtime, + topic: document['topic'], + payload: document['payload'].to_h) + end + + cursor = query_cursor(query).skip(offset).limit(limit) + + if block_given? + cursor.each { |document| block.call(to_message(document)) } + else + cursor.map { |document| to_message(document) } end end diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index e63aefee..62590110 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -59,6 +59,7 @@ module Journal drop if opts[:drop] create_table create_index('topic_index', 'topic') + create_index('timestamp_index', 'timestamp') end def create_table @@ -92,7 +93,13 @@ module Journal [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) end - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) + def to_message(row) + timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') + JournalMessage.new(id: row['id'], timestamp: timestamp, + topic: row['topic'], payload: JSON.parse(row['payload'])) + end + if query sql, params = query_to_sql(query) sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] @@ -101,10 +108,10 @@ module Journal params = [] end res = @conn.exec_params(sql, params) - res.map do |row| - timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') - JournalMessage.new(id: row['id'], timestamp: timestamp, - topic: row['topic'], payload: JSON.parse(row['payload'])) + if block_given? + res.each { |row| block.call(to_message(row)) } + else + res.map { |row| to_message(row) } end end diff --git a/test/test_journal.rb b/test/test_journal.rb index b9f5c612..37497351 100644 --- a/test/test_journal.rb +++ b/test/test_journal.rb @@ -22,7 +22,9 @@ class JournalMessageTest < Test::Unit::TestCase end assert_nil(m.get('nope', nil)) assert_nil(m.get('baz')) - assert_equal(23, m.get('qux.quxx')) + assert_equal(23, m['qux.quxx']) + assert_equal(nil, m['qux.nope']) + assert_raise(ArgumentError) { m.get('qux.nope') } end end @@ -163,8 +165,8 @@ class JournalBrokerTest < Test::Unit::TestCase received = [] journal = JournalBroker.new - # subscribe to messages: - sub = journal.subscribe(Query.define { topic 'foo' }) do |message| + # subscribe to messages for topic foo: + sub = journal.subscribe('foo') do |message| received << message end @@ -236,6 +238,31 @@ module JournalStorageTestMixin assert_equal(m, @storage.find.first) end + def test_find + # tests limit/offset and block parameters of find() + @storage.insert(JournalMessage.create('irclogs', {message: 'foo'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'bar'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'baz'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'qux'})) + + msgs = [] + @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m| + msgs << m + end + assert_equal(2, msgs.length) + assert_equal('bar', msgs.first['message']) + assert_equal('baz', msgs.last['message']) + + msgs = [] + @storage.find(Query.define({topic: 'irclogs'})) do |m| + msgs << m + end + assert_equal(4, msgs.length) + assert_equal('foo', msgs.first['message']) + assert_equal('qux', msgs.last['message']) + + end + def test_operations_multiple # test operations on multiple messages # insert a bunch: @@ -269,15 +296,27 @@ module JournalStorageTestMixin assert_equal(0, @storage.count) end - def test_journal - # this journal persists messages in the test storage: - journal = JournalBroker.new(storage: @storage) - journal.publish 'log.irc', action: 'message' + def test_broker_interface + journal = JournalBroker.new(storage: @storage) + + journal.publish 'irclogs', message: 'foo' + journal.publish 'irclogs', message: 'bar' + journal.publish 'irclogs', message: 'baz' + journal.publish 'irclogs', message: 'qux' + + # wait for messages to be consumed: sleep 0.1 - assert_equal(1, journal.count) + + msgs = [] + journal.find({topic: 'irclogs'}, 2, 1) do |m| + msgs << m + end + assert_equal(2, msgs.length) + assert_equal('bar', msgs.first['message']) + assert_equal('baz', msgs.last['message']) end - NUM=150_000 + NUM=100 # 1_000_000 def test_benchmark puts @@ -323,13 +362,14 @@ module JournalStorageTestMixin end +if ENV['PG_URI'] class JournalStoragePostgresTest < Test::Unit::TestCase include JournalStorageTestMixin def setup @storage = Storage::PostgresStorage.new( - uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal', + uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal', drop: true) end @@ -358,15 +398,22 @@ class JournalStoragePostgresTest < Test::Unit::TestCase end end +else + puts 'NOTE: Set PG_URI environment variable to test postgresql storage.' +end +if ENV['MONGO_URI'] class JournalStorageMongoTest < Test::Unit::TestCase include JournalStorageTestMixin def setup @storage = Storage::MongoStorage.new( + uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot', drop: true) end - +end +else + puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.' end -- cgit v1.2.3