]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
Merge pull request #4 from ahpook/rename_karma
authorMatthias Hecker <36882671+mattzque@users.noreply.github.com>
Fri, 27 Mar 2020 19:58:27 +0000 (20:58 +0100)
committerGitHub <noreply@github.com>
Fri, 27 Mar 2020 19:58:27 +0000 (20:58 +0100)
Rename and improve karma plugin

lib/rbot/core/journal.rb [new file with mode: 0644]
lib/rbot/core/journal_irclog.rb [new file with mode: 0644]
lib/rbot/core/webservice.rb
lib/rbot/ircbot.rb
lib/rbot/journal.rb [new file with mode: 0644]
lib/rbot/journal/mongo.rb [new file with mode: 0644]
lib/rbot/journal/postgres.rb [new file with mode: 0644]
lib/rbot/plugins.rb
lib/rbot/registry.rb
rbot.gemspec
test/test_journal.rb [new file with mode: 0644]

diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb
new file mode 100644 (file)
index 0000000..c4f5f0f
--- /dev/null
@@ -0,0 +1,69 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot journal management from IRC
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'rbot/journal'
+
+module ::Irc
+class Bot
+  # this should return the journal if the managing plugin has been loaded.
+  def journal
+    if @plugins['journal']
+      @plugins['journal'].broker
+    end
+  end
+end
+end
+
+class JournalModule < CoreBotModule
+
+  attr_reader :broker
+
+  include Irc::Bot::Journal
+
+  Config.register Config::StringValue.new('journal.storage',
+    :default => nil,
+    :requires_rescan => true,
+    :desc => 'storage engine used by the journal')
+  Config.register Config::StringValue.new('journal.storage.uri',
+    :default => nil,
+    :requires_rescan => true,
+    :desc => 'storage database uri')
+
+  def initialize
+    super
+    storage = nil
+    name = @bot.config['journal.storage']
+    uri = @bot.config['journal.storage.uri']
+    if name
+      begin
+        storage = Storage.create(name, uri)
+      rescue
+        error 'journal storage initialization error!'
+        error $!
+        error $@.join("\n")
+      end
+    end
+    debug 'journal broker starting up...'
+    @broker = JournalBroker.new(storage: storage)
+  end
+
+  def cleanup
+    super
+    debug 'journal broker shutting down...'
+    @broker.shutdown
+    @broker = nil
+  end
+
+  def help(plugin, topic='')
+    'journal'
+  end
+
+end
+
+journal = JournalModule.new
+journal.priority = -2
+
diff --git a/lib/rbot/core/journal_irclog.rb b/lib/rbot/core/journal_irclog.rb
new file mode 100644 (file)
index 0000000..0617d99
--- /dev/null
@@ -0,0 +1,71 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: irc logging into the journal
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+class JournalIrcLogModule < CoreBotModule
+
+  include Irc::Bot::Journal
+
+  Config.register Config::ArrayValue.new('journal.irclog.whitelist',
+    :default => [],
+    :desc => 'only perform journal irc logging for those channel/users')
+
+  Config.register Config::ArrayValue.new('journal.irclog.blacklist',
+    :default => [],
+    :desc => 'exclude journal irc logging for those channel/users')
+
+  def publish(payload)
+    if payload[:target]
+      target = payload[:target]
+      whitelist = @bot.config['journal.irclog.whitelist']
+      blacklist = @bot.config['journal.irclog.blacklist']
+      unless whitelist.empty?
+        return unless whitelist.include? target
+      end
+      unless blacklist.empty?
+        return if blacklist.include? target
+      end
+    end
+    @bot.journal.publish('irclog', payload)
+  end
+
+  def log_message(m)
+    unless m.kind_of? BasicUserMessage
+      warning 'journal irc logger can\'t log %s message' % [m.class.to_s]
+    else
+      payload = {
+        type: m.class.name.downcase.match(/(\w+)message/).captures.first,
+        addressed: m.address?,
+        replied: m.replied?,
+        identified: m.identified?,
+
+        source: m.source.to_s,
+        source_user: m.botuser.to_s,
+        source_address: m.sourceaddress,
+        target: m.target.to_s,
+        server: m.server.to_s,
+
+        message: m.logmessage,
+      }
+      publish(payload)
+    end
+  end
+
+  # messages sent
+  def sent(m)
+    log_message(m)
+  end
+
+  # messages received
+  def listen(m)
+    log_message(m)
+  end
+end
+
+plugin = JournalIrcLogModule.new
+# make sure the logger gets loaded after the journal
+plugin.priority = -1
+
index 0ddbc2d5941288ac1945de36c56f43d8ffcfb006..eb01226c3ec2378b97b28d9c73bac2973c3dc919 100644 (file)
@@ -17,6 +17,8 @@ require 'webrick/https'
 require 'openssl'
 require 'cgi'
 require 'json'
+require 'erb'
+require 'ostruct'
 
 module ::Irc
 class Bot
@@ -82,7 +84,10 @@ class Bot
         }
 
         @path = req.path
-        debug '@path = ' + @path.inspect
+
+        @load_path = [File.join(Config::datadir, 'web')]
+        @load_path += @bot.plugins.core_module_dirs
+        @load_path += @bot.plugins.plugin_dirs
       end
 
       def parse_query(query)
@@ -124,6 +129,27 @@ class Bot
       def send_html(body, status=200)
         send_response(body, status, 'text/html')
       end
+
+      # Expands a relative filename to absolute using a list of load paths.
+      def get_load_path(filename)
+        @load_path.each do |path|
+          file = File.join(path, filename) 
+          return file if File.exists?(file)
+        end
+      end
+
+      # Renders a erb template and responds it
+      def render(template, args={})
+        file = get_load_path template
+        if not file
+          raise 'template not found: ' + template
+        end
+
+        tmpl = ERB.new(IO.read(file))
+        ns = OpenStruct.new(args)
+        body = tmpl.result(ns.instance_eval { binding })
+        send_html(body, 200)
+      end
     end
 
     # works similar to a message mapper but for url paths
@@ -394,6 +420,10 @@ class WebServiceModule < CoreBotModule
     :requires_rescan => true,
     :desc => 'Host the web service will bind on')
 
+  Config.register Config::StringValue.new('webservice.url',
+    :default => 'http://127.0.0.1:7268',
+    :desc => 'The public URL of the web service.')
+
   Config.register Config::BooleanValue.new('webservice.ssl',
     :default => false,
     :requires_rescan => true,
index eb158c6329fcb1a55a169b50fc3f80d086b35b66..739aaadee49c7075f45180bf2cde707b7244d328 100644 (file)
@@ -201,6 +201,7 @@ class Bot
   # loads and opens new registry databases, used by the plugins
   attr_accessor :registry_factory
 
+  # web service
   attr_accessor :webservice
 
   # server we are connected to
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
new file mode 100644 (file)
index 0000000..981ff6e
--- /dev/null
@@ -0,0 +1,430 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'thread'
+require 'securerandom'
+
+module Irc
+class Bot
+module Journal
+
+=begin rdoc
+
+  The journal is a persistent message queue for rbot, its based on a basic
+  publish/subscribe model and persists messages into backend databases
+  that can be efficiently searched for past messages.
+
+  It is a addition to the key value storage already present in rbot
+  through its registry subsystem.
+
+=end
+
+  class InvalidJournalMessage < StandardError
+  end
+  class StorageError < StandardError
+  end
+
+  class JournalMessage
+    # a unique identification of this message
+    attr_reader :id
+
+    # describes a hierarchical queue into which this message belongs
+    attr_reader :topic
+
+    # when this message was published as a Time instance
+    attr_reader :timestamp
+
+    # contains the actual message as a Hash
+    attr_reader :payload
+
+    def initialize(message)
+      @id = message[:id]
+      @timestamp = message[:timestamp]
+      @topic = message[:topic]
+      @payload = message[:payload]
+      if @payload.class != Hash
+        raise InvalidJournalMessage.new('payload must be a hash!')
+      end
+    end
+
+    # Access payload value by key.
+    def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
+      value = pkey.split('.').reduce(@payload) do |hash, key|
+        if hash.has_key?(key) or hash.has_key?(key.to_sym)
+          hash[key] || hash[key.to_sym]
+        else
+          if default == :exception
+            raise ArgumentError.new
+          else
+            default
+          end
+        end
+      end
+    end
+
+    # Access payload value by key alias for get(key, nil).
+    def [](key)
+      get(key, nil)
+    end
+
+    def ==(other)
+      (@id == other.id) rescue false
+    end
+
+    def self.create(topic, payload, opt={})
+      # cleanup payload to only contain strings
+      JournalMessage.new(
+        id: opt[:id] || SecureRandom.uuid,
+        timestamp: opt[:timestamp] || Time.now,
+        topic: topic,
+        payload: payload
+      )
+    end
+  end
+
+  module Storage
+    class AbstractStorage
+      # intializes/opens a new storage connection
+      def initialize(opts={})
+      end
+
+      # inserts a message in storage
+      def insert(message)
+      end
+
+      # creates/ensures a index exists on the payload specified by key
+      def ensure_payload_index(key)
+      end
+
+      # returns a array of message instances that match the query
+      def find(query=nil, limit=100, offset=0, &block)
+      end
+
+      # returns the number of messages that match the query
+      def count(query=nil)
+      end
+
+      # remove messages that match the query
+      def remove(query=nil)
+      end
+
+      # destroy the underlying table/collection
+      def drop
+      end
+
+      # Returns all classes from the namespace that implement this interface
+      def self.get_impl
+        ObjectSpace.each_object(Class).select { |klass| klass < self }
+      end
+    end
+
+    def self.create(name, uri)
+      log 'load journal storage adapter: ' + name
+      load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
+      cls = AbstractStorage.get_impl.first
+      cls.new(uri: uri)
+    end
+  end
+
+  # Describes a query on journal entries, it is used both to describe
+  # a subscription aswell as to query persisted messages.
+  # There two ways to declare a Query instance, using
+  # the DSL like this:
+  #
+  #   Query.define do
+  #     id 'foo'
+  #     id 'bar'
+  #     topic 'log.irc.*'
+  #     topic 'log.core'
+  #     timestamp from: Time.now, to: Time.now + 60 * 10
+  #     payload 'action': :privmsg
+  #     payload 'channel': '#rbot'
+  #     payload 'foo.bar': 'baz'
+  #   end
+  #
+  # or using a hash: (NOTE: avoid using symbols in payload)
+  #
+  #   Query.define({
+  #     id: ['foo', 'bar'],
+  #     topic: ['log.irc.*', 'log.core'],
+  #     timestamp: {
+  #       from: Time.now
+  #       to: Time.now + 60 * 10
+  #     },
+  #     payload: {
+  #       'action' => 'privmsg'
+  #       'channel' => '#rbot',
+  #       'foo.bar' => 'baz'
+  #     }
+  #   })
+  #
+  class Query
+    # array of ids to match (OR)
+    attr_reader :id
+    # array of topics to match with wildcard support (OR)
+    attr_reader :topic
+    # hash with from: timestamp and to: timestamp
+    attr_reader :timestamp
+    # hash of key values to match
+    attr_reader :payload
+
+    def initialize(query)
+      @id = query[:id] || []
+      @id = [@id] if @id.is_a? String
+      @topic = query[:topic] || []
+      @topic = [@topic] if @topic.is_a? String
+      @timestamp = {
+        from: nil, to: nil
+      }
+      if query[:timestamp] and query[:timestamp][:from]
+        @timestamp[:from] = query[:timestamp][:from]
+      end
+      if query[:timestamp] and query[:timestamp][:to]
+        @timestamp[:to] = query[:timestamp][:to]
+      end
+      @payload = query[:payload] || {}
+    end
+
+    # returns true if the given message matches the query
+    def matches?(message)
+      return false if not @id.empty? and not @id.include? message.id
+      return false if not @topic.empty? and not topic_matches? message.topic
+      if @timestamp[:from]
+        return false unless message.timestamp >= @timestamp[:from]
+      end
+      if @timestamp[:to]
+        return false unless message.timestamp <= @timestamp[:to]
+      end
+      found = false
+      @payload.each_pair do |key, value|
+        begin
+          message.get(key.to_s)
+        rescue ArgumentError
+        end
+        found = true
+      end
+      return false if not found and not @payload.empty?
+      true
+    end
+
+    def topic_matches?(_topic)
+      @topic.each do |topic|
+        if topic.include? '*'
+          match = true
+          topic.split('.').zip(_topic.split('.')).each do |a, b|
+            if a == '*'
+              if not b or b.empty?
+                match = false
+              end
+            else
+              match = false unless a == b
+            end
+          end
+          return true if match
+        else
+          return true if topic == _topic
+        end
+      end
+      return false
+    end
+
+    # factory that constructs a query
+    class Factory
+      attr_reader :query
+      def initialize
+        @query = {
+          id: [],
+          topic: [],
+          timestamp: {
+            from: nil, to: nil
+          },
+          payload: {}
+        }
+      end
+
+      def id(*_id)
+        @query[:id] += _id
+      end
+
+      def topic(*_topic)
+          @query[:topic] += _topic
+      end
+
+      def timestamp(range)
+        @query[:timestamp] = range
+      end
+
+      def payload(query)
+        @query[:payload].merge!(query)
+      end
+    end
+
+    def self.define(query=nil, &block)
+      factory = Factory.new
+      if block_given?
+        factory.instance_eval(&block)
+        query = factory.query
+      end
+      Query.new query
+    end
+
+  end
+
+
+  class JournalBroker
+    attr_reader :storage
+    class Subscription
+      attr_reader :topic
+      attr_reader :block
+      def initialize(broker, topic, block)
+        @broker = broker
+        @topic = topic
+        @block = block
+      end
+      def cancel
+        @broker.unsubscribe(self)
+      end
+    end
+
+    def initialize(opts={})
+      # overrides the internal consumer with a block
+      @consumer = opts[:consumer]
+      # storage backend
+      @storage = opts[:storage]
+      unless @storage
+        warning 'journal broker: no storage set up, won\'t persist messages'
+      end
+      @queue = Queue.new
+      # consumer thread:
+      @thread = Thread.new do
+        while message = @queue.pop
+          begin
+            consume message
+          # pop(true) ... rescue ThreadError => e
+          rescue Exception => e
+            error 'journal broker: exception in consumer thread'
+            error $!
+          end
+        end
+      end
+      @subscriptions = []
+      # lookup-table for subscriptions by their topic
+      @topic_subs = {}
+    end
+
+    def consume(message)
+      return unless message
+      @consumer.call(message) if @consumer
+
+      # notify subscribers
+      if @topic_subs.has_key? message.topic
+        @topic_subs[message.topic].each do |s|
+          s.block.call(message)
+        end
+      end
+
+      @storage.insert(message) if @storage
+    end
+
+    def persists?
+      true if @storage
+    end
+
+    def shutdown
+      log 'journal shutdown'
+      @subscriptions.clear
+      @topic_subs.clear
+      @queue << nil
+      @thread.join
+      @thread = nil
+    end
+
+    def publish(topic, payload)
+      debug 'journal publish message in %s: %s' % [topic, payload.inspect]
+      @queue << JournalMessage::create(topic, payload)
+      nil
+    end
+
+    # Subscribe to receive messages from a topic.
+    #
+    # You can use this method to subscribe to messages that
+    # are published within a specified topic. You must provide
+    # a receiving block to receive messages one-by-one.
+    # The method returns an instance of Subscription that can
+    # be used to cancel the subscription by invoking cancel
+    # on it.
+    #
+    #   journal.subscribe('irclog') do |message|
+    #     # received irclog messages...
+    #   end
+    #
+    def subscribe(topic=nil, &block)
+      raise ArgumentError.new unless block_given?
+      s = Subscription.new(self, topic, block)
+      @subscriptions << s
+      unless @topic_subs.has_key? topic
+        @topic_subs[topic] = []
+      end
+      @topic_subs[topic] << s
+      s
+    end
+
+    def unsubscribe(s)
+      if @topic_subs.has_key? s.topic
+        @topic_subs[s.topic].delete(s)
+      end
+      @subscriptions.delete s
+    end
+
+    # Find and return persisted messages by a query.
+    #
+    # This method will either return all messages or call the provided
+    # block for each message. It will filter the messages by the
+    # provided Query instance. Limit and offset might be used to
+    # constrain the result.
+    # The query might also be a hash or proc that is passed to
+    # Query.define first.
+    #
+    # @param query [Query] 
+    # @param limit [Integer] how many items to return
+    # @param offset [Integer] relative offset in results
+    def find(query, limit=100, offset=0, &block)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
+      if block_given?
+        @storage.find(query, limit, offset, &block)
+      else
+        @storage.find(query, limit, offset)
+      end
+    end
+
+    def count(query=nil)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
+      @storage.count(query)
+    end
+
+    def remove(query=nil)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
+      @storage.remove(query)
+    end
+
+    def ensure_payload_index(key)
+      @storage.ensure_payload_index(key)
+    end
+
+  end
+
+end # Journal
+end # Bot
+end # Irc
+
diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
new file mode 100644 (file)
index 0000000..a03355f
--- /dev/null
@@ -0,0 +1,131 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for mongoDB
+
+require 'mongo'
+require 'json'
+
+module Irc
+class Bot
+module Journal
+
+  module Storage
+
+    class MongoStorage < AbstractStorage
+      attr_reader :client
+
+      def initialize(opts={})
+        Mongo::Logger.logger.level = Logger::WARN
+        @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
+        @client = Mongo::Client.new(@uri)
+        @collection = @client['journal']
+        log 'journal storage: mongodb connected to ' + @uri
+        
+        drop if opts[:drop]
+        @collection.indexes.create_one({topic: 1})
+        @collection.indexes.create_one({timestamp: 1})
+      end
+
+      def ensure_payload_index(key)
+        @collection.indexes.create_one({'payload.'+key => 1})
+      end
+
+      def insert(m)
+        @collection.insert_one({
+          '_id' => m.id,
+          'topic' => m.topic,
+          'timestamp' => m.timestamp,
+          'payload' => m.payload
+        })
+      end
+
+      def find(query=nil, limit=100, offset=0, &block)
+        def to_message(document)
+          JournalMessage.new(id: document['_id'],
+                             timestamp: document['timestamp'].localtime,
+                             topic: document['topic'],
+                             payload: document['payload'].to_h)
+        end
+
+        cursor = query_cursor(query).skip(offset).limit(limit)
+
+        if block_given?
+          cursor.each { |document| block.call(to_message(document)) }
+        else
+          cursor.map { |document| to_message(document) }
+        end
+      end
+
+      # returns the number of messages that match the query
+      def count(query=nil)
+        query_cursor(query).count
+      end
+
+      def remove(query=nil)
+        query_cursor(query).delete_many
+      end
+
+      def drop
+        @collection.drop
+      end
+
+      def query_cursor(query)
+        unless query
+          return @collection.find()
+        end
+
+        query_and = []
+
+        # ID query OR condition
+        unless query.id.empty?
+          query_and << {
+            '$or' => query.id.map { |_id| 
+              {'_id' => _id}
+            }
+          }
+        end
+
+        unless query.topic.empty?
+          query_and << {
+            '$or' => query.topic.map { |topic|
+              if topic.include?('*')
+                pattern = topic.gsub('.', '\.').gsub('*', '.*')
+                {'topic' => {'$regex' => pattern}}
+              else
+                {'topic' => topic}
+              end
+            }
+          }
+        end
+
+        if query.timestamp[:from] or query.timestamp[:to]
+          where = {}
+          if query.timestamp[:from]
+            where['$gte'] = query.timestamp[:from]
+          end
+          if query.timestamp[:to]
+            where['$lte'] = query.timestamp[:to]
+          end
+          query_and << {'timestamp' => where}
+        end
+
+        unless query.payload.empty?
+          query_and << {
+            '$or' => query.payload.map { |key, value|
+              key = 'payload.' + key
+              {key => value}
+            }
+          }
+        end
+
+        @collection.find({
+          '$and' => query_and
+        })
+      end
+    end
+  end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
new file mode 100644 (file)
index 0000000..7b33315
--- /dev/null
@@ -0,0 +1,280 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for postgresql
+
+require 'pg'
+require 'json'
+
+# wraps the postgres driver in a single thread
+class PGWrapper
+  def initialize(uri)
+    @uri = uri
+    @queue = Queue.new
+    run_thread
+  end
+
+  def run_thread
+    Thread.new do
+      @conn = PG.connect(@uri)
+      while message = @queue.pop
+        return_queue = message.shift
+        begin
+          result = @conn.send(*message)
+          return_queue << [:result, result]
+        rescue Exception => e
+          return_queue << [:exception, e]
+        end
+      end
+      @conn.finish
+    end
+  end
+
+  def run_in_thread(*args)
+    rq = Queue.new
+    @queue << [rq, *args]
+    type, result = rq.pop
+    if type == :exception
+      raise result
+    else
+      result
+    end
+  end
+
+  public
+
+  def destroy
+    @queue << nil
+  end
+
+  def exec(query)
+    run_in_thread(:exec, query)
+  end
+
+  def exec_params(query, params)
+    run_in_thread(:exec_params, query, params)
+  end
+
+  def escape_string(string)
+    @conn.escape_string(string)
+  end
+end
+
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
+declare 
+   l_count integer;
+begin
+  select count(*)
+     into l_count
+  from pg_indexes
+  where schemaname = 'public'
+    and tablename = lower(table_name)
+    and indexname = lower(index_name);
+
+  if l_count = 0 then 
+     execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+  end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
+
+module Irc
+class Bot
+module Journal
+
+  module Storage
+
+    class PostgresStorage < AbstractStorage
+      attr_reader :conn
+
+      def initialize(opts={})
+        @uri = opts[:uri] || 'postgresql://localhost/rbot'
+        @conn = PGWrapper.new(@uri)
+        @conn.exec('set client_min_messages = warning')
+        @conn.exec(CREATE_INDEX)
+        @version = @conn.exec('SHOW server_version;')[0]['server_version']
+
+        @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
+        log 'journal storage: postgresql connected to version: ' + @version
+        
+        version = @version.split('.')[0,3].join.to_i
+        if version < 930
+          raise StorageError.new(
+            'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
+        end
+        @jsonb = (version >= 940)
+        log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+        log 'journal storage: postgres backend is using JSONB :)' if @jsonb
+
+        drop if opts[:drop]
+        create_table
+        create_index('topic_index', 'topic')
+        create_index('timestamp_index', 'timestamp')
+      end
+
+      def create_table
+        @conn.exec('
+          CREATE TABLE IF NOT EXISTS journal
+            (id UUID PRIMARY KEY,
+             topic TEXT NOT NULL,
+             timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+             payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+      end
+
+      def create_index(index_name, column_name)
+        debug 'journal postges backend: create index %s for %s' % [
+          index_name, column_name]
+        @conn.exec_params('SELECT create_index($1, $2, $3)', [
+          'journal', index_name, column_name])
+      end
+
+      def create_payload_index(key)
+        index_name = 'idx_payload_' + key.gsub('.', '_')
+        column = sql_payload_selector(key)
+        create_index(index_name, column)
+      end
+
+      def ensure_index(key)
+        create_payload_index(key)
+      end
+
+      def insert(m)
+        @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+          [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+      end
+
+      def find(query=nil, limit=100, offset=0, &block)
+        def to_message(row)
+          timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+          JournalMessage.new(id: row['id'], timestamp: timestamp,
+            topic: row['topic'], payload: JSON.parse(row['payload']))
+        end
+
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+        else
+          sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+        if block_given?
+          res.each { |row| block.call(to_message(row)) }
+        else
+          res.map { |row| to_message(row) }
+        end
+      end
+
+      # returns the number of messages that match the query
+      def count(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+        else
+          sql = 'SELECT COUNT(*) FROM journal'
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+        res[0]['count'].to_i
+      end
+
+      def remove(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'DELETE FROM journal WHERE ' + sql
+        else
+          sql = 'DELETE FROM journal;'
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+      end
+
+      def drop
+        @conn.exec('DROP TABLE journal;') rescue nil
+      end
+
+      def sql_payload_selector(key)
+        selector = 'payload'
+        k = key.to_s.split('.')
+        k.each_index { |i|
+          if i >= k.length-1
+            selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+          else
+            selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+          end
+        }
+        selector
+      end
+
+      def query_to_sql(query)
+        params = []
+        placeholder = Proc.new do |value|
+          params << value
+          '$%d' % [params.length]
+        end
+        sql = {op: 'AND', list: []}
+
+        # ID query OR condition
+        unless query.id.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.id.map { |id| 
+              'id = ' + placeholder.call(id)
+            }
+          }
+        end
+
+        # Topic query OR condition
+        unless query.topic.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.topic.map { |topic| 
+              'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+            }
+          }
+        end
+
+        # Timestamp range query AND condition
+        if query.timestamp[:from] or query.timestamp[:to]
+          list = []
+          if query.timestamp[:from]
+            list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+          end
+          if query.timestamp[:to]
+            list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+          end
+          sql[:list] << {
+            op: 'AND',
+            list: list
+          }
+        end
+
+        # Payload query
+        unless query.payload.empty?
+          list = []
+          query.payload.each_pair do |key, value|
+            selector = sql_payload_selector(key)
+            list << selector + ' = ' + placeholder.call(value)
+          end
+          sql[:list] << {
+            op: 'OR',
+            list: list
+          }
+        end
+
+        sql = sql[:list].map { |stmt|
+          '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+        }.join(' %s ' % [sql[:op]])
+
+        [sql, params]
+      end
+    end
+  end
+end # Journal
+end # Bot
+end # Irc
index 0e7e9d6f4a4163465aa6a2df810b913554d2198d..8621fe45341456e485894a4768c8d8c298ae8257 100644 (file)
@@ -410,6 +410,9 @@ module Plugins
     attr_reader :botmodules
     attr_reader :maps
 
+    attr_reader :core_module_dirs
+    attr_reader :plugin_dirs
+
     # This is the list of patterns commonly delegated to plugins.
     # A fast delegation lookup is enabled for them.
     DEFAULT_DELEGATE_PATTERNS = %r{^(?:
index 5e905ebb9408ba9774ce2ad3f43fddd925c59824..04892d73d69bd4dd3cfa3aba773df3f7fba55eee 100644 (file)
@@ -57,6 +57,15 @@ class Registry
     @libpath = File.join(File.dirname(__FILE__), 'registry')
     @format = format
     load File.join(@libpath, @format+'.rb') if format
+    # The get_impl method will return all implementations of the
+    # abstract accessor interface, since we only ever load one
+    # (the configured one) accessor implementation, we can just assume
+    # it to be the correct accessor to use.
+    accessors = AbstractAccessor.get_impl
+    if accessors.length > 1
+      warning 'multiple accessor implementations loaded!'
+    end
+    @accessor_class = accessors.first
   end
 
   # Returns a list of supported registry database formats.
@@ -68,12 +77,7 @@ class Registry
 
   # Creates a new Accessor object for the specified database filename.
   def create(path, filename)
-    # The get_impl method will return a list of all the classes that
-    # implement the accessor interface, since we only ever load one
-    # (the configured one) accessor implementation, we can just assume
-    # it to be the correct accessor to use.
-    cls = AbstractAccessor.get_impl.first
-    db = cls.new(File.join(path, 'registry_' + @format, filename.downcase))
+    db = @accessor_class.new(File.join(path, 'registry_' + @format, filename.downcase))
     db.optimize
     db
   end
@@ -330,7 +334,7 @@ class Registry
 
     # Returns all classes from the namespace that implement this interface
     def self.get_impl
-      ObjectSpace.each_object(Class).select { |klass| klass < self }
+      ObjectSpace.each_object(Class).select { |klass| klass.ancestors[1] == self }
     end
   end
 
index 29d88c1f60851834103ec56bfa250f1976d61618..d023609d2ed2c3881d7400a300000b096f28f570 100644 (file)
@@ -1,3 +1,5 @@
+require 'rake'
+
 Gem::Specification.new do |s|
   s.name = 'rbot'
   s.version = '0.9.15'
@@ -17,13 +19,13 @@ Gem::Specification.new do |s|
          'COPYING',
          'COPYING.rbot',
          'GPLv2',
-         'README.rdoc',
+         'README.md',
          'REQUIREMENTS',
          'TODO',
          'ChangeLog',
          'INSTALL',
          'Usage_en.txt',
-         'man/rbot.1',
+         'man/rbot.xml',
          'setup.rb',
          'launch_here.rb',
          'po/*.pot',
diff --git a/test/test_journal.rb b/test/test_journal.rb
new file mode 100644 (file)
index 0000000..f1653c1
--- /dev/null
@@ -0,0 +1,421 @@
+$:.unshift File.join(File.dirname(__FILE__), '../lib')
+
+require 'test/unit'
+require 'rbot/ircbot'
+require 'rbot/journal'
+require 'rbot/journal/postgres.rb'
+require 'rbot/journal/mongo.rb'
+
+require 'benchmark'
+
+DAY=60*60*24
+
+class JournalMessageTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_get
+    m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}})
+    assert_equal(42, m.get('bar'))
+    assert_raise ArgumentError do
+      m.get('nope')
+    end
+    assert_nil(m.get('nope', nil))
+    assert_nil(m.get('baz'))
+    assert_equal(23, m['qux.quxx'])
+    assert_equal(nil, m['qux.nope'])
+    assert_raise(ArgumentError) { m.get('qux.nope') }
+  end
+
+end
+
+class QueryTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_define
+
+    q = Query.define do
+      id 'foo'
+      id 'bar', 'baz'
+      topic 'log.irc.*'
+      topic 'log.core', 'baz'
+      timestamp from: Time.now, to: Time.now + 60 * 10
+      payload 'action': :privmsg, 'alice': 'bob'
+      payload 'channel': '#rbot'
+      payload 'foo.bar': 'baz'
+    end
+    assert_equal(['foo', 'bar', 'baz'], q.id)
+    assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic)
+    assert_equal([:from, :to], q.timestamp.keys)
+    assert_equal(Time, q.timestamp[:to].class)
+    assert_equal(Time, q.timestamp[:from].class)
+    assert_equal({
+      'action': :privmsg, 'alice': 'bob',
+      'channel': '#rbot',
+      'foo.bar': 'baz'
+    }, q.payload)
+
+  end
+
+  def test_topic_matches
+    q = Query.define do
+      topic 'foo'
+    end
+    assert_true(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_false(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo.bar'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo.*'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+    assert_true(q.topic_matches?('foo.baz'))
+
+    q = Query.define do
+      topic '*.bar'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_false(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('foo.bar'))
+    assert_true(q.topic_matches?('bar.bar'))
+    assert_false(q.topic_matches?('foo.foo'))
+
+    q = Query.define do
+      topic '*.*'
+    end
+    assert_false(q.topic_matches?('foo'))
+    assert_true(q.topic_matches?('foo.bar'))
+
+    q = Query.define do
+      topic 'foo'
+      topic 'bar'
+      topic 'baz.alice.bob.*.foo'
+    end
+    assert_true(q.topic_matches?('foo'))
+    assert_true(q.topic_matches?('bar'))
+    assert_true(q.topic_matches?('baz.alice.bob.asdf.foo'))
+    assert_false(q.topic_matches?('baz.alice.bob..foo'))
+
+  end
+  def test_matches
+    q = Query.define do
+      #id 'foo', 'bar'
+      topic 'log.irc.*', 'log.core'
+      timestamp from: Time.now - DAY, to: Time.now + DAY
+      payload 'action': 'privmsg', 'foo.bar': 'baz'
+    end
+    assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'}))
+    assert_false(q.matches? JournalMessage.create('baz', {}))
+    assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}}))
+
+    # tests timestamp from/to:
+    assert_true(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now,
+      payload: {action: 'privmsg'}))
+    assert_false(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now - DAY*3,
+      payload: {action: 'privmsg'}))
+    assert_false(q.matches? JournalMessage.new(
+      id: 'foo',
+      topic: 'log.core',
+      timestamp: Time.now + DAY*3,
+      payload: {action: 'privmsg'}))
+  end
+
+end
+
+class JournalBrokerTest < Test::Unit::TestCase
+
+  include Irc::Bot::Journal
+
+  def test_publish
+    received = []
+    journal = JournalBroker.new(consumer: Proc.new { |message|
+      received << message
+    })
+
+    # publish some messages:
+    journal.publish 'log.irc',
+      source: 'alice', message: '<3 pg'
+    journal.publish 'log.irc',
+      source: 'bob', message: 'mysql > pg'
+    journal.publish 'log.irc',
+      source: 'alice', target: 'bob', action: :kick
+
+    # wait for messages to be consumed:
+    sleep 0.1
+    assert_equal(3, received.length)
+  end
+
+  def test_subscribe
+    received = []
+    journal = JournalBroker.new
+
+    # subscribe to messages for topic foo:
+    sub = journal.subscribe('foo') do |message|
+      received << message
+    end
+
+    # publish some messages:
+    journal.publish 'foo', {}
+    journal.publish 'bar', {}
+    journal.publish 'foo', {}
+
+    # wait for messages to be consumed:
+    sleep 0.1
+    assert_equal(2, received.length)
+
+    received.clear
+
+    journal.publish 'foo', {}
+    sleep 0.1
+    sub.cancel
+    journal.publish 'foo', {}
+    sleep 0.1
+    assert_equal(1, received.length)
+  end
+
+end
+
+module JournalStorageTestMixin
+
+  include Irc::Bot::Journal
+
+  def teardown
+    @storage.drop
+  end
+
+  def test_operations
+    # insertion
+    m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}})
+    @storage.insert(m)
+
+    # query by id
+    res = @storage.find(Query.define { id m.id })
+    assert_equal(1, res.length)
+    assert_equal(m, res.first)
+
+    # check timestamp was returned correctly:
+    assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
+                 res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+
+    # check if payload was returned correctly:
+    assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload)
+
+    # query by topic
+    assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
+    assert_equal(m, @storage.find(Query.define { topic('log.*') }).first)
+    assert_equal(m, @storage.find(Query.define { topic('*.*') }).first)
+
+    # query by timestamp range
+    assert_equal(1, @storage.find(Query.define {
+      timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length)
+    assert_equal(0, @storage.find(Query.define {
+      timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length)
+
+    # query by payload
+    res = @storage.find(Query.define { payload('foo.bar' => 'baz') })
+    assert_equal(m, res.first)
+    res = @storage.find(Query.define { payload('foo.bar' => 'x') })
+    assert_true(res.empty?)
+
+    # without arguments: find and count
+    assert_equal(1, @storage.count)
+    assert_equal(m, @storage.find.first)
+  end
+
+  def test_find
+    # tests limit/offset and block parameters of find()
+    @storage.insert(JournalMessage.create('irclogs', {message: 'foo'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'bar'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'baz'}))
+    @storage.insert(JournalMessage.create('irclogs', {message: 'qux'}))
+
+    msgs = []
+    @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m|
+      msgs << m
+    end
+    assert_equal(2, msgs.length)
+    assert_equal('bar', msgs.first['message'])
+    assert_equal('baz', msgs.last['message'])
+
+    msgs = []
+    @storage.find(Query.define({topic: 'irclogs'})) do |m|
+      msgs << m
+    end
+    assert_equal(4, msgs.length)
+    assert_equal('foo', msgs.first['message'])
+    assert_equal('qux', msgs.last['message'])
+
+  end
+
+  def test_operations_multiple
+    # test operations on multiple messages
+    # insert a bunch:
+    @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+    @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+    @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+    @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+      timestamp: Time.now - DAY*100))
+    @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+      name: 'Italy'
+    }}))
+    @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+      name: 'Austria'
+    }}))
+
+    # query by topic
+    assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length)
+    # query by payload
+    assert_equal(1, @storage.find(Query.define {
+      payload('country.name' => 'Austria') }).length)
+    # query by timestamp range
+    assert_equal(1, @storage.find(Query.define {
+      timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length)
+
+    # count with query
+    assert_equal(2, @storage.count(Query.define { topic('complex') }))
+    assert_equal(6, @storage.count)
+    @storage.remove(Query.define { topic('archived.*') })
+    assert_equal(5, @storage.count)
+    @storage.remove
+    assert_equal(0, @storage.count)
+  end
+
+  def test_broker_interface
+    journal = JournalBroker.new(storage: @storage) 
+
+    journal.publish 'irclogs', message: 'foo'
+    journal.publish 'irclogs', message: 'bar'
+    journal.publish 'irclogs', message: 'baz'
+    journal.publish 'irclogs', message: 'qux'
+
+    # wait for messages to be consumed:
+    sleep 0.1
+
+    msgs = []
+    journal.find({topic: 'irclogs'}, 2, 1) do |m|
+      msgs << m
+    end
+    assert_equal(2, msgs.length)
+    assert_equal('bar', msgs.first['message'])
+    assert_equal('baz', msgs.last['message'])
+
+    journal.ensure_payload_index('foo.bar.baz')
+  end
+
+  NUM=100 # 1_000_000
+  def test_benchmark
+    puts
+
+    assert_equal(0, @storage.count)
+    # prepare messages to insert, we benchmark the storage backend not ruby
+    num = 0
+    messages = (0...NUM).map do
+      num += 1
+      JournalMessage.create(
+            'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+    end
+
+    # iter is the number of operations performed WITHIN block
+    def benchmark(label, iter, &block)
+      time = Benchmark.realtime do
+        yield
+      end
+      puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter]
+    end
+
+    benchmark(@storage.class.to_s+'~insert', messages.length) do
+      messages.each { |m|
+        @storage.insert(m)
+      }
+    end
+
+    benchmark(@storage.class.to_s+'~find_by_id', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { id m.id })
+      }
+    end
+    benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { topic m.topic })
+      }
+    end
+    benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { topic m.topic.gsub('topic', '*') })
+      }
+    end
+  end
+
+end
+
+if ENV['PG_URI']
+class JournalStoragePostgresTest < Test::Unit::TestCase
+
+  include JournalStorageTestMixin
+
+  def setup
+    @storage = Storage::PostgresStorage.new(
+      uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal',
+      drop: true)
+  end
+
+  def test_query_to_sql
+    q = Query.define do
+      id 'foo'
+      id 'bar', 'baz'
+      topic 'log.irc.*'
+      topic 'log.core', 'baz'
+      timestamp from: Time.now, to: Time.now + 60 * 10
+      payload 'action': :privmsg, 'alice': 'bob'
+      payload 'channel': '#rbot'
+      payload 'foo.bar': 'baz'
+    end
+    sql = @storage.query_to_sql(q)
+    assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+    q = Query.define do
+      id 'foo'
+    end
+    assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+    q = Query.define do
+      topic 'foo.*.bar'
+    end
+    assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+    assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
+  end
+
+end
+else
+  puts 'NOTE: Set PG_URI environment variable to test postgresql storage.'
+end
+
+if ENV['MONGO_URI']
+class JournalStorageMongoTest < Test::Unit::TestCase
+
+  include JournalStorageTestMixin
+
+  def setup
+    @storage = Storage::MongoStorage.new(
+      uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot',
+      drop: true)
+  end
+end
+else
+  puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.'
+end
+