]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
journal: start with core botmodule, api changes
authorMatthias Hecker <apoc@geekosphere.org>
Sat, 20 Jun 2015 17:25:39 +0000 (19:25 +0200)
committerMatthias Hecker <apoc@geekosphere.org>
Sat, 20 Jun 2015 17:25:39 +0000 (19:25 +0200)
lib/rbot/core/journal.rb [new file with mode: 0644]
lib/rbot/ircbot.rb
lib/rbot/journal.rb
lib/rbot/journal/mongo.rb
lib/rbot/journal/postgres.rb
test/test_journal.rb

diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb
new file mode 100644 (file)
index 0000000..f8a8862
--- /dev/null
@@ -0,0 +1,52 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot journal management from IRC
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'rbot/journal'
+
+class JournalModule < CoreBotModule
+
+  attr_reader :broker
+
+  include Irc::Bot::Journal
+
+  Config.register Config::StringValue.new('journal.storage',
+    :default => nil,
+    :requires_rescan => true,
+    :desc => 'storage engine used by the journal')
+  Config.register Config::StringValue.new('journal.storage.uri',
+    :default => nil,
+    :requires_rescan => true,
+    :desc => 'storage database uri')
+
+  def initialize
+    super
+    storage = nil
+    name = @bot.config['journal.storage']
+    uri = @bot.config['journal.storage.uri']
+    if name
+      storage = Storage.create(name, uri)
+    end
+    debug 'journal broker starting up...'
+    @broker = JournalBroker.new(storage: storage)
+  end
+
+  def cleanup
+    super
+    debug 'journal broker shutting down...'
+    @broker.shutdown
+    @broker = nil
+  end
+
+  def help(plugin, topic='')
+    'journal'
+  end
+
+end
+
+journal = JournalModule.new
+journal.priority = -2
+
index caabc15dc325f769ca7932cc4ba69b4de99980b5..46e4faaac7a6e0a90daa6dd43152c1eee5c7beb6 100644 (file)
@@ -156,7 +156,6 @@ require 'rbot/registry'
 require 'rbot/plugins'
 require 'rbot/message'
 require 'rbot/language'
-require 'rbot/journal'
 
 module Irc
 
@@ -205,9 +204,6 @@ class Bot
   # web service
   attr_accessor :webservice
 
-  # persistent message queue
-  attr_accessor :journal
-
   # server we are connected to
   # TODO multiserver
   def server
@@ -230,6 +226,13 @@ class Bot
     myself.channels
   end
 
+  # returns the journal
+  def journal
+    if @plugins['journal']
+      @plugins['journal'].broker
+    end
+  end
+
   # nick wanted by the bot. This defaults to the irc.nick config value,
   # but may be overridden by a manual !nick command
   def wanted_nick
@@ -550,8 +553,6 @@ class Bot
 
     log_session_start
 
-    @journal = Journal::JournalBroker.new(bot: self)
-
     if $daemonize
       log "Redirecting standard input/output/error"
       [$stdin, $stdout, $stderr].each do |fd|
index 0b4324fe6b10517f786149162b0bac0552ef4f89..c5bfcfea1e4f699f93ff4ee4b84f844c5bed5718 100644 (file)
@@ -3,6 +3,8 @@
 #++
 #
 # :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
 
 require 'thread'
 require 'securerandom'
@@ -22,19 +24,8 @@ module Journal
 
 =end
 
-  Config.register Config::StringValue.new('journal.storage',
-    :default => nil,
-    :requires_restart => true,
-    :desc => 'storage engine used by the journal')
-  Config.register Config::StringValue.new('journal.storage.uri',
-    :default => nil,
-    :requires_restart => true,
-    :desc => 'storage database uri')
-
   class InvalidJournalMessage < StandardError
   end
-  class ConsumeInterrupt < StandardError
-  end
   class StorageError < StandardError
   end
 
@@ -61,6 +52,7 @@ module Journal
       end
     end
 
+    # Access payload value by key.
     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
       value = pkey.split('.').reduce(@payload) do |hash, key|
         if hash.has_key?(key) or hash.has_key?(key.to_sym)
@@ -75,6 +67,11 @@ module Journal
       end
     end
 
+    # Access payload value by key alias for get(key, nil).
+    def [](key)
+      get(key, nil)
+    end
+
     def ==(other)
       (@id == other.id) rescue false
     end
@@ -104,7 +101,7 @@ module Journal
       end
 
       # returns a array of message instances that match the query
-      def find(query=nil, limit=100, offset=0)
+      def find(query=nil, limit=100, offset=0, &block)
       end
 
       # returns the number of messages that match the query
@@ -176,10 +173,20 @@ module Journal
     attr_reader :payload
 
     def initialize(query)
-      @id = query[:id]
-      @topic = query[:topic]
-      @timestamp = query[:timestamp]
-      @payload = query[:payload]
+      @id = query[:id] || []
+      @id = [@id] if @id.is_a? String
+      @topic = query[:topic] || []
+      @topic = [@topic] if @topic.is_a? String
+      @timestamp = {
+        from: nil, to: nil
+      }
+      if query[:timestamp] and query[:timestamp][:from]
+        @timestamp[:from] = query[:timestamp][:from]
+      end
+      if query[:timestamp] and query[:timestamp][:to]
+        @timestamp[:to] = query[:timestamp][:to]
+      end
+      @payload = query[:payload] || {}
     end
 
     # returns true if the given message matches the query
@@ -270,11 +277,11 @@ module Journal
 
   class JournalBroker
     class Subscription
-      attr_reader :query
+      attr_reader :topic
       attr_reader :block
-      def initialize(broker, query, block)
+      def initialize(broker, topic, block)
         @broker = broker
-        @query = query
+        @topic = topic
         @block = block
       end
       def cancel
@@ -285,36 +292,27 @@ module Journal
     def initialize(opts={})
       # overrides the internal consumer with a block
       @consumer = opts[:consumer]
-      @bot = opts[:bot]
       # storage backend
-      if @bot
-        @storage = opts[:storage] || Storage.create(
-            @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
-      else
-        @storage = opts[:storage]
-      end
+      @storage = opts[:storage]
       unless @storage
         warning 'journal broker: no storage set up, won\'t persist messages'
       end
       @queue = Queue.new
       # consumer thread:
       @thread = Thread.new do
-        loop do
+        while message = @queue.pop
           begin
-            consume @queue.pop
+            consume message
           # pop(true) ... rescue ThreadError => e
-          rescue ConsumeInterrupt => e
-            error 'journal broker: stop thread, consume interrupt raised'
-            break
           rescue Exception => e
             error 'journal broker: exception in consumer thread'
             error $!
           end
         end
       end
-      # TODO: this is a first naive implementation, later we do the
-      #       message/query matching for incoming messages more efficiently
       @subscriptions = []
+      # lookup-table for subscriptions by their topic
+      @topic_subs = {}
     end
 
     def consume(message)
@@ -322,8 +320,8 @@ module Journal
       @consumer.call(message) if @consumer
 
       # notify subscribers
-      @subscriptions.each do |s|
-        if s.query.matches? message
+      if @topic_subs.has_key? message.topic
+        @topic_subs[message.topic].each do |s|
           s.block.call(message)
         end
       end
@@ -335,36 +333,68 @@ module Journal
       true if @storage
     end
 
-    def join
-      @thread.join
-    end
-
     def shutdown
-      @thread.raise ConsumeInterrupt.new
+      log 'journal shutdown'
+      @subscriptions.clear
+      @topic_subs.clear
+      @queue << nil
+      @thread.join
+      @thread = nil
     end
 
     def publish(topic, payload)
-      @queue.push JournalMessage::create(topic, payload)
+      @queue << JournalMessage::create(topic, payload)
     end
 
-    # subscribe to messages that match the given query
-    def subscribe(query, &block)
+    # Subscribe to receive messages from a topic.
+    #
+    # You can use this method to subscribe to messages that
+    # are published within a specified topic. You must provide
+    # a receiving block to receive messages one-by-one.
+    # The method returns an instance of Subscription that can
+    # be used to cancel the subscription by invoking cancel
+    # on it.
+    #
+    #   journal.subscribe('irclog') do |message|
+    #     # received irclog messages...
+    #   end
+    #
+    def subscribe(topic=nil, &block)
       raise ArgumentError.new unless block_given?
-      s = Subscription.new(self, query, block)
+      s = Subscription.new(self, topic, block)
       @subscriptions << s
+      unless @topic_subs.has_key? topic
+        @topic_subs[topic] = []
+      end
+      @topic_subs[topic] << s
       s
     end
 
-    def unsubscribe(subscription)
-      @subscriptions.delete subscription
+    def unsubscribe(s)
+      if @topic_subs.has_key? s.topic
+        @topic_subs[s.topic].delete(s)
+      end
+      @subscriptions.delete s
     end
 
-    def find(query=nil, limit=100, offset=0, &block)
+    # Find and return persisted messages by a query.
+    #
+    # This method will either return all messages or call the provided
+    # block for each message. It will filter the messages by the
+    # provided Query instance. Limit and offset might be used to
+    # constrain the result.
+    # The query might also be a hash or proc that is passed to
+    # Query.define first.
+    #
+    # @param query [Query] 
+    # @param limit [Integer] how many items to return
+    # @param offset [Integer] relative offset in results
+    def find(query, limit=100, offset=0, &block)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
       if block_given?
-        begin
-          res = @storage.find(query, limit, offset)
-          block.call(res)
-        end until res.length > 0
+        @storage.find(query, limit, offset, &block)
       else
         @storage.find(query, limit, offset)
       end
index 24e9cfcccf7ffed868fe12e7625a5b158ed6b7e6..2e735587acca72aafb25ba0fa3312e470ca3802c 100644 (file)
@@ -25,6 +25,7 @@ module Journal
         
         drop if opts[:drop]
         @collection.indexes.create_one({topic: 1})
+        @collection.indexes.create_one({timestamp: 1})
       end
 
       def ensure_index(key)
@@ -40,10 +41,20 @@ module Journal
         })
       end
 
-      def find(query=nil, limit=100, offset=0)
-        query_cursor(query).skip(offset).limit(limit).map do |document|
-          JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime,
-            topic: document['topic'], payload: document['payload'].to_h)
+      def find(query=nil, limit=100, offset=0, &block)
+        def to_message(document)
+          JournalMessage.new(id: document['_id'],
+                             timestamp: document['timestamp'].localtime,
+                             topic: document['topic'],
+                             payload: document['payload'].to_h)
+        end
+
+        cursor = query_cursor(query).skip(offset).limit(limit)
+
+        if block_given?
+          cursor.each { |document| block.call(to_message(document)) }
+        else
+          cursor.map { |document| to_message(document) }
         end
       end
 
index e63aefee983bf8907d6dd5c678919f8cab202d3b..625901104624c590662511d53bf9742ed3907225 100644 (file)
@@ -59,6 +59,7 @@ module Journal
         drop if opts[:drop]
         create_table
         create_index('topic_index', 'topic')
+        create_index('timestamp_index', 'timestamp')
       end
 
       def create_table
@@ -92,7 +93,13 @@ module Journal
           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
       end
 
-      def find(query=nil, limit=100, offset=0)
+      def find(query=nil, limit=100, offset=0, &block)
+        def to_message(row)
+          timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+          JournalMessage.new(id: row['id'], timestamp: timestamp,
+            topic: row['topic'], payload: JSON.parse(row['payload']))
+        end
+
         if query
           sql, params = query_to_sql(query)
           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
@@ -101,10 +108,10 @@ module Journal
           params = []
         end
         res = @conn.exec_params(sql, params)
-        res.map do |row|
-          timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
-          JournalMessage.new(id: row['id'], timestamp: timestamp,
-            topic: row['topic'], payload: JSON.parse(row['payload']))
+        if block_given?
+          res.each { |row| block.call(to_message(row)) }
+        else
+          res.map { |row| to_message(row) }
         end
       end
 
index b9f5c61219009f28d4ee4643f20263d99fa7b90a..374973519300226f9e32f8705124013969d14a48 100644 (file)
@@ -22,7 +22,9 @@ class JournalMessageTest < Test::Unit::TestCase
     end
     assert_nil(m.get('nope', nil))
     assert_nil(m.get('baz'))
-    assert_equal(23, m.get('qux.quxx'))
+    assert_equal(23, m['qux.quxx'])
+    assert_equal(nil, m['qux.nope'])
+    assert_raise(ArgumentError) { m.get('qux.nope') }
   end
 
 end
@@ -163,8 +165,8 @@ class JournalBrokerTest < Test::Unit::TestCase
     received = []
     journal = JournalBroker.new
 
-    # subscribe to messages:
-    sub = journal.subscribe(Query.define { topic 'foo' }) do |message|
+    # subscribe to messages for topic foo:
+    sub = journal.subscribe('foo') do |message|
       received << message
     end
 
@@ -236,6 +238,31 @@ module JournalStorageTestMixin
     assert_equal(m, @storage.find.first)
   end
 
+  def test_find
+    # tests limit/offset and block parameters of find()
+    @storage.insert(JournalMessage.create('irclogs', {message: 'foo'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'bar'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'baz'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'qux'}))
+
+    msgs = []
+    @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m|
+      msgs << m
+    end
+    assert_equal(2, msgs.length)
+    assert_equal('bar', msgs.first['message'])
+    assert_equal('baz', msgs.last['message'])
+
+    msgs = []
+    @storage.find(Query.define({topic: 'irclogs'})) do |m|
+      msgs << m
+    end
+    assert_equal(4, msgs.length)
+    assert_equal('foo', msgs.first['message'])
+    assert_equal('qux', msgs.last['message'])
+
+  end
+
   def test_operations_multiple
     # test operations on multiple messages
     # insert a bunch:
@@ -269,15 +296,27 @@ module JournalStorageTestMixin
     assert_equal(0, @storage.count)
   end
 
-  def test_journal
-    # this journal persists messages in the test storage:
-    journal = JournalBroker.new(storage: @storage)
-    journal.publish 'log.irc', action: 'message'
+  def test_broker_interface
+    journal = JournalBroker.new(storage: @storage) 
+
+    journal.publish 'irclogs', message: 'foo'
+    journal.publish 'irclogs', message: 'bar'
+    journal.publish 'irclogs', message: 'baz'
+    journal.publish 'irclogs', message: 'qux'
+
+    # wait for messages to be consumed:
     sleep 0.1
-    assert_equal(1, journal.count)
+
+    msgs = []
+    journal.find({topic: 'irclogs'}, 2, 1) do |m|
+      msgs << m
+    end
+    assert_equal(2, msgs.length)
+    assert_equal('bar', msgs.first['message'])
+    assert_equal('baz', msgs.last['message'])
   end
 
-  NUM=150_000
+  NUM=100 # 1_000_000
   def test_benchmark
     puts
 
@@ -323,13 +362,14 @@ module JournalStorageTestMixin
 
 end
 
+if ENV['PG_URI']
 class JournalStoragePostgresTest < Test::Unit::TestCase
 
   include JournalStorageTestMixin
 
   def setup
     @storage = Storage::PostgresStorage.new(
-      uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+      uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal',
       drop: true)
   end
 
@@ -358,15 +398,22 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
   end
 
 end
+else
+  puts 'NOTE: Set PG_URI environment variable to test postgresql storage.'
+end
 
+if ENV['MONGO_URI']
 class JournalStorageMongoTest < Test::Unit::TestCase
 
   include JournalStorageTestMixin
 
   def setup
     @storage = Storage::MongoStorage.new(
+      uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot',
       drop: true)
   end
-
+end
+else
+  puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.'
 end