From: Matthias Hecker <36882671+mattzque@users.noreply.github.com> Date: Fri, 27 Mar 2020 19:58:27 +0000 (+0100) Subject: Merge pull request #4 from ahpook/rename_karma X-Git-Url: https://git.netwichtig.de/gitweb/?a=commitdiff_plain;h=b6db18c5467c1a161e3fcc39d82ad1b38e213c87;hp=bc7efe2d4b360da0276287e6cc7f6a401609c162;p=user%2Fhenk%2Fcode%2Fruby%2Frbot.git Merge pull request #4 from ahpook/rename_karma Rename and improve karma plugin --- diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb new file mode 100644 index 00000000..c4f5f0f4 --- /dev/null +++ b/lib/rbot/core/journal.rb @@ -0,0 +1,69 @@ +#-- vim:sw=2:et +#++ +# +# :title: rbot journal management from IRC +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +require 'rbot/journal' + +module ::Irc +class Bot + # this should return the journal if the managing plugin has been loaded. + def journal + if @plugins['journal'] + @plugins['journal'].broker + end + end +end +end + +class JournalModule < CoreBotModule + + attr_reader :broker + + include Irc::Bot::Journal + + Config.register Config::StringValue.new('journal.storage', + :default => nil, + :requires_rescan => true, + :desc => 'storage engine used by the journal') + Config.register Config::StringValue.new('journal.storage.uri', + :default => nil, + :requires_rescan => true, + :desc => 'storage database uri') + + def initialize + super + storage = nil + name = @bot.config['journal.storage'] + uri = @bot.config['journal.storage.uri'] + if name + begin + storage = Storage.create(name, uri) + rescue + error 'journal storage initialization error!' + error $! + error $@.join("\n") + end + end + debug 'journal broker starting up...' + @broker = JournalBroker.new(storage: storage) + end + + def cleanup + super + debug 'journal broker shutting down...' + @broker.shutdown + @broker = nil + end + + def help(plugin, topic='') + 'journal' + end + +end + +journal = JournalModule.new +journal.priority = -2 + diff --git a/lib/rbot/core/journal_irclog.rb b/lib/rbot/core/journal_irclog.rb new file mode 100644 index 00000000..0617d991 --- /dev/null +++ b/lib/rbot/core/journal_irclog.rb @@ -0,0 +1,71 @@ +#-- vim:sw=2:et +#++ +# +# :title: irc logging into the journal +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +class JournalIrcLogModule < CoreBotModule + + include Irc::Bot::Journal + + Config.register Config::ArrayValue.new('journal.irclog.whitelist', + :default => [], + :desc => 'only perform journal irc logging for those channel/users') + + Config.register Config::ArrayValue.new('journal.irclog.blacklist', + :default => [], + :desc => 'exclude journal irc logging for those channel/users') + + def publish(payload) + if payload[:target] + target = payload[:target] + whitelist = @bot.config['journal.irclog.whitelist'] + blacklist = @bot.config['journal.irclog.blacklist'] + unless whitelist.empty? + return unless whitelist.include? target + end + unless blacklist.empty? + return if blacklist.include? target + end + end + @bot.journal.publish('irclog', payload) + end + + def log_message(m) + unless m.kind_of? BasicUserMessage + warning 'journal irc logger can\'t log %s message' % [m.class.to_s] + else + payload = { + type: m.class.name.downcase.match(/(\w+)message/).captures.first, + addressed: m.address?, + replied: m.replied?, + identified: m.identified?, + + source: m.source.to_s, + source_user: m.botuser.to_s, + source_address: m.sourceaddress, + target: m.target.to_s, + server: m.server.to_s, + + message: m.logmessage, + } + publish(payload) + end + end + + # messages sent + def sent(m) + log_message(m) + end + + # messages received + def listen(m) + log_message(m) + end +end + +plugin = JournalIrcLogModule.new +# make sure the logger gets loaded after the journal +plugin.priority = -1 + diff --git a/lib/rbot/core/webservice.rb b/lib/rbot/core/webservice.rb index 0ddbc2d5..eb01226c 100644 --- a/lib/rbot/core/webservice.rb +++ b/lib/rbot/core/webservice.rb @@ -17,6 +17,8 @@ require 'webrick/https' require 'openssl' require 'cgi' require 'json' +require 'erb' +require 'ostruct' module ::Irc class Bot @@ -82,7 +84,10 @@ class Bot } @path = req.path - debug '@path = ' + @path.inspect + + @load_path = [File.join(Config::datadir, 'web')] + @load_path += @bot.plugins.core_module_dirs + @load_path += @bot.plugins.plugin_dirs end def parse_query(query) @@ -124,6 +129,27 @@ class Bot def send_html(body, status=200) send_response(body, status, 'text/html') end + + # Expands a relative filename to absolute using a list of load paths. + def get_load_path(filename) + @load_path.each do |path| + file = File.join(path, filename) + return file if File.exists?(file) + end + end + + # Renders a erb template and responds it + def render(template, args={}) + file = get_load_path template + if not file + raise 'template not found: ' + template + end + + tmpl = ERB.new(IO.read(file)) + ns = OpenStruct.new(args) + body = tmpl.result(ns.instance_eval { binding }) + send_html(body, 200) + end end # works similar to a message mapper but for url paths @@ -394,6 +420,10 @@ class WebServiceModule < CoreBotModule :requires_rescan => true, :desc => 'Host the web service will bind on') + Config.register Config::StringValue.new('webservice.url', + :default => 'http://127.0.0.1:7268', + :desc => 'The public URL of the web service.') + Config.register Config::BooleanValue.new('webservice.ssl', :default => false, :requires_rescan => true, diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb index eb158c63..739aaade 100644 --- a/lib/rbot/ircbot.rb +++ b/lib/rbot/ircbot.rb @@ -201,6 +201,7 @@ class Bot # loads and opens new registry databases, used by the plugins attr_accessor :registry_factory + # web service attr_accessor :webservice # server we are connected to diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb new file mode 100644 index 00000000..981ff6e4 --- /dev/null +++ b/lib/rbot/journal.rb @@ -0,0 +1,430 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +require 'thread' +require 'securerandom' + +module Irc +class Bot +module Journal + +=begin rdoc + + The journal is a persistent message queue for rbot, its based on a basic + publish/subscribe model and persists messages into backend databases + that can be efficiently searched for past messages. + + It is a addition to the key value storage already present in rbot + through its registry subsystem. + +=end + + class InvalidJournalMessage < StandardError + end + class StorageError < StandardError + end + + class JournalMessage + # a unique identification of this message + attr_reader :id + + # describes a hierarchical queue into which this message belongs + attr_reader :topic + + # when this message was published as a Time instance + attr_reader :timestamp + + # contains the actual message as a Hash + attr_reader :payload + + def initialize(message) + @id = message[:id] + @timestamp = message[:timestamp] + @topic = message[:topic] + @payload = message[:payload] + if @payload.class != Hash + raise InvalidJournalMessage.new('payload must be a hash!') + end + end + + # Access payload value by key. + def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? + value = pkey.split('.').reduce(@payload) do |hash, key| + if hash.has_key?(key) or hash.has_key?(key.to_sym) + hash[key] || hash[key.to_sym] + else + if default == :exception + raise ArgumentError.new + else + default + end + end + end + end + + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + + def ==(other) + (@id == other.id) rescue false + end + + def self.create(topic, payload, opt={}) + # cleanup payload to only contain strings + JournalMessage.new( + id: opt[:id] || SecureRandom.uuid, + timestamp: opt[:timestamp] || Time.now, + topic: topic, + payload: payload + ) + end + end + + module Storage + class AbstractStorage + # intializes/opens a new storage connection + def initialize(opts={}) + end + + # inserts a message in storage + def insert(message) + end + + # creates/ensures a index exists on the payload specified by key + def ensure_payload_index(key) + end + + # returns a array of message instances that match the query + def find(query=nil, limit=100, offset=0, &block) + end + + # returns the number of messages that match the query + def count(query=nil) + end + + # remove messages that match the query + def remove(query=nil) + end + + # destroy the underlying table/collection + def drop + end + + # Returns all classes from the namespace that implement this interface + def self.get_impl + ObjectSpace.each_object(Class).select { |klass| klass < self } + end + end + + def self.create(name, uri) + log 'load journal storage adapter: ' + name + load File.join(File.dirname(__FILE__), 'journal', name + '.rb') + cls = AbstractStorage.get_impl.first + cls.new(uri: uri) + end + end + + # Describes a query on journal entries, it is used both to describe + # a subscription aswell as to query persisted messages. + # There two ways to declare a Query instance, using + # the DSL like this: + # + # Query.define do + # id 'foo' + # id 'bar' + # topic 'log.irc.*' + # topic 'log.core' + # timestamp from: Time.now, to: Time.now + 60 * 10 + # payload 'action': :privmsg + # payload 'channel': '#rbot' + # payload 'foo.bar': 'baz' + # end + # + # or using a hash: (NOTE: avoid using symbols in payload) + # + # Query.define({ + # id: ['foo', 'bar'], + # topic: ['log.irc.*', 'log.core'], + # timestamp: { + # from: Time.now + # to: Time.now + 60 * 10 + # }, + # payload: { + # 'action' => 'privmsg' + # 'channel' => '#rbot', + # 'foo.bar' => 'baz' + # } + # }) + # + class Query + # array of ids to match (OR) + attr_reader :id + # array of topics to match with wildcard support (OR) + attr_reader :topic + # hash with from: timestamp and to: timestamp + attr_reader :timestamp + # hash of key values to match + attr_reader :payload + + def initialize(query) + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} + end + + # returns true if the given message matches the query + def matches?(message) + return false if not @id.empty? and not @id.include? message.id + return false if not @topic.empty? and not topic_matches? message.topic + if @timestamp[:from] + return false unless message.timestamp >= @timestamp[:from] + end + if @timestamp[:to] + return false unless message.timestamp <= @timestamp[:to] + end + found = false + @payload.each_pair do |key, value| + begin + message.get(key.to_s) + rescue ArgumentError + end + found = true + end + return false if not found and not @payload.empty? + true + end + + def topic_matches?(_topic) + @topic.each do |topic| + if topic.include? '*' + match = true + topic.split('.').zip(_topic.split('.')).each do |a, b| + if a == '*' + if not b or b.empty? + match = false + end + else + match = false unless a == b + end + end + return true if match + else + return true if topic == _topic + end + end + return false + end + + # factory that constructs a query + class Factory + attr_reader :query + def initialize + @query = { + id: [], + topic: [], + timestamp: { + from: nil, to: nil + }, + payload: {} + } + end + + def id(*_id) + @query[:id] += _id + end + + def topic(*_topic) + @query[:topic] += _topic + end + + def timestamp(range) + @query[:timestamp] = range + end + + def payload(query) + @query[:payload].merge!(query) + end + end + + def self.define(query=nil, &block) + factory = Factory.new + if block_given? + factory.instance_eval(&block) + query = factory.query + end + Query.new query + end + + end + + + class JournalBroker + attr_reader :storage + class Subscription + attr_reader :topic + attr_reader :block + def initialize(broker, topic, block) + @broker = broker + @topic = topic + @block = block + end + def cancel + @broker.unsubscribe(self) + end + end + + def initialize(opts={}) + # overrides the internal consumer with a block + @consumer = opts[:consumer] + # storage backend + @storage = opts[:storage] + unless @storage + warning 'journal broker: no storage set up, won\'t persist messages' + end + @queue = Queue.new + # consumer thread: + @thread = Thread.new do + while message = @queue.pop + begin + consume message + # pop(true) ... rescue ThreadError => e + rescue Exception => e + error 'journal broker: exception in consumer thread' + error $! + end + end + end + @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} + end + + def consume(message) + return unless message + @consumer.call(message) if @consumer + + # notify subscribers + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| + s.block.call(message) + end + end + + @storage.insert(message) if @storage + end + + def persists? + true if @storage + end + + def shutdown + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil + end + + def publish(topic, payload) + debug 'journal publish message in %s: %s' % [topic, payload.inspect] + @queue << JournalMessage::create(topic, payload) + nil + end + + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) + raise ArgumentError.new unless block_given? + s = Subscription.new(self, topic, block) + @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s + s + end + + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s + end + + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end + if block_given? + @storage.find(query, limit, offset, &block) + else + @storage.find(query, limit, offset) + end + end + + def count(query=nil) + unless query.is_a? Query + query = Query.define(query) + end + @storage.count(query) + end + + def remove(query=nil) + unless query.is_a? Query + query = Query.define(query) + end + @storage.remove(query) + end + + def ensure_payload_index(key) + @storage.ensure_payload_index(key) + end + + end + +end # Journal +end # Bot +end # Irc + diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb new file mode 100644 index 00000000..a03355f9 --- /dev/null +++ b/lib/rbot/journal/mongo.rb @@ -0,0 +1,131 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: journal backend for mongoDB + +require 'mongo' +require 'json' + +module Irc +class Bot +module Journal + + module Storage + + class MongoStorage < AbstractStorage + attr_reader :client + + def initialize(opts={}) + Mongo::Logger.logger.level = Logger::WARN + @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot' + @client = Mongo::Client.new(@uri) + @collection = @client['journal'] + log 'journal storage: mongodb connected to ' + @uri + + drop if opts[:drop] + @collection.indexes.create_one({topic: 1}) + @collection.indexes.create_one({timestamp: 1}) + end + + def ensure_payload_index(key) + @collection.indexes.create_one({'payload.'+key => 1}) + end + + def insert(m) + @collection.insert_one({ + '_id' => m.id, + 'topic' => m.topic, + 'timestamp' => m.timestamp, + 'payload' => m.payload + }) + end + + def find(query=nil, limit=100, offset=0, &block) + def to_message(document) + JournalMessage.new(id: document['_id'], + timestamp: document['timestamp'].localtime, + topic: document['topic'], + payload: document['payload'].to_h) + end + + cursor = query_cursor(query).skip(offset).limit(limit) + + if block_given? + cursor.each { |document| block.call(to_message(document)) } + else + cursor.map { |document| to_message(document) } + end + end + + # returns the number of messages that match the query + def count(query=nil) + query_cursor(query).count + end + + def remove(query=nil) + query_cursor(query).delete_many + end + + def drop + @collection.drop + end + + def query_cursor(query) + unless query + return @collection.find() + end + + query_and = [] + + # ID query OR condition + unless query.id.empty? + query_and << { + '$or' => query.id.map { |_id| + {'_id' => _id} + } + } + end + + unless query.topic.empty? + query_and << { + '$or' => query.topic.map { |topic| + if topic.include?('*') + pattern = topic.gsub('.', '\.').gsub('*', '.*') + {'topic' => {'$regex' => pattern}} + else + {'topic' => topic} + end + } + } + end + + if query.timestamp[:from] or query.timestamp[:to] + where = {} + if query.timestamp[:from] + where['$gte'] = query.timestamp[:from] + end + if query.timestamp[:to] + where['$lte'] = query.timestamp[:to] + end + query_and << {'timestamp' => where} + end + + unless query.payload.empty? + query_and << { + '$or' => query.payload.map { |key, value| + key = 'payload.' + key + {key => value} + } + } + end + + @collection.find({ + '$and' => query_and + }) + end + end + end +end # Journal +end # Bot +end # Irc diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb new file mode 100644 index 00000000..7b333158 --- /dev/null +++ b/lib/rbot/journal/postgres.rb @@ -0,0 +1,280 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: journal backend for postgresql + +require 'pg' +require 'json' + +# wraps the postgres driver in a single thread +class PGWrapper + def initialize(uri) + @uri = uri + @queue = Queue.new + run_thread + end + + def run_thread + Thread.new do + @conn = PG.connect(@uri) + while message = @queue.pop + return_queue = message.shift + begin + result = @conn.send(*message) + return_queue << [:result, result] + rescue Exception => e + return_queue << [:exception, e] + end + end + @conn.finish + end + end + + def run_in_thread(*args) + rq = Queue.new + @queue << [rq, *args] + type, result = rq.pop + if type == :exception + raise result + else + result + end + end + + public + + def destroy + @queue << nil + end + + def exec(query) + run_in_thread(:exec, query) + end + + def exec_params(query, params) + run_in_thread(:exec_params, query, params) + end + + def escape_string(string) + @conn.escape_string(string) + end +end + +# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres. +# define function to be able to create an index in case it doesnt exist: +# source: http://stackoverflow.com/a/26012880 +CREATE_INDEX = <<-EOT +CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ +declare + l_count integer; +begin + select count(*) + into l_count + from pg_indexes + where schemaname = 'public' + and tablename = lower(table_name) + and indexname = lower(index_name); + + if l_count = 0 then + execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')'; + end if; +end; +$$ LANGUAGE plpgsql; +EOT + +module Irc +class Bot +module Journal + + module Storage + + class PostgresStorage < AbstractStorage + attr_reader :conn + + def initialize(opts={}) + @uri = opts[:uri] || 'postgresql://localhost/rbot' + @conn = PGWrapper.new(@uri) + @conn.exec('set client_min_messages = warning') + @conn.exec(CREATE_INDEX) + @version = @conn.exec('SHOW server_version;')[0]['server_version'] + + @version.gsub!(/^(\d+\.\d+)$/, '\1.0') + log 'journal storage: postgresql connected to version: ' + @version + + version = @version.split('.')[0,3].join.to_i + if version < 930 + raise StorageError.new( + 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version]) + end + @jsonb = (version >= 940) + log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb + log 'journal storage: postgres backend is using JSONB :)' if @jsonb + + drop if opts[:drop] + create_table + create_index('topic_index', 'topic') + create_index('timestamp_index', 'timestamp') + end + + def create_table + @conn.exec(' + CREATE TABLE IF NOT EXISTS journal + (id UUID PRIMARY KEY, + topic TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON']) + end + + def create_index(index_name, column_name) + debug 'journal postges backend: create index %s for %s' % [ + index_name, column_name] + @conn.exec_params('SELECT create_index($1, $2, $3)', [ + 'journal', index_name, column_name]) + end + + def create_payload_index(key) + index_name = 'idx_payload_' + key.gsub('.', '_') + column = sql_payload_selector(key) + create_index(index_name, column) + end + + def ensure_index(key) + create_payload_index(key) + end + + def insert(m) + @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);', + [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) + end + + def find(query=nil, limit=100, offset=0, &block) + def to_message(row) + timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') + JournalMessage.new(id: row['id'], timestamp: timestamp, + topic: row['topic'], payload: JSON.parse(row['payload'])) + end + + if query + sql, params = query_to_sql(query) + sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + else + sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + params = [] + end + res = @conn.exec_params(sql, params) + if block_given? + res.each { |row| block.call(to_message(row)) } + else + res.map { |row| to_message(row) } + end + end + + # returns the number of messages that match the query + def count(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql + else + sql = 'SELECT COUNT(*) FROM journal' + params = [] + end + res = @conn.exec_params(sql, params) + res[0]['count'].to_i + end + + def remove(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'DELETE FROM journal WHERE ' + sql + else + sql = 'DELETE FROM journal;' + params = [] + end + res = @conn.exec_params(sql, params) + end + + def drop + @conn.exec('DROP TABLE journal;') rescue nil + end + + def sql_payload_selector(key) + selector = 'payload' + k = key.to_s.split('.') + k.each_index { |i| + if i >= k.length-1 + selector += '->>\'%s\'' % [@conn.escape_string(k[i])] + else + selector += '->\'%s\'' % [@conn.escape_string(k[i])] + end + } + selector + end + + def query_to_sql(query) + params = [] + placeholder = Proc.new do |value| + params << value + '$%d' % [params.length] + end + sql = {op: 'AND', list: []} + + # ID query OR condition + unless query.id.empty? + sql[:list] << { + op: 'OR', + list: query.id.map { |id| + 'id = ' + placeholder.call(id) + } + } + end + + # Topic query OR condition + unless query.topic.empty? + sql[:list] << { + op: 'OR', + list: query.topic.map { |topic| + 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%')) + } + } + end + + # Timestamp range query AND condition + if query.timestamp[:from] or query.timestamp[:to] + list = [] + if query.timestamp[:from] + list << 'timestamp >= ' + placeholder.call(query.timestamp[:from]) + end + if query.timestamp[:to] + list << 'timestamp <= ' + placeholder.call(query.timestamp[:to]) + end + sql[:list] << { + op: 'AND', + list: list + } + end + + # Payload query + unless query.payload.empty? + list = [] + query.payload.each_pair do |key, value| + selector = sql_payload_selector(key) + list << selector + ' = ' + placeholder.call(value) + end + sql[:list] << { + op: 'OR', + list: list + } + end + + sql = sql[:list].map { |stmt| + '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')' + }.join(' %s ' % [sql[:op]]) + + [sql, params] + end + end + end +end # Journal +end # Bot +end # Irc diff --git a/lib/rbot/plugins.rb b/lib/rbot/plugins.rb index 0e7e9d6f..8621fe45 100644 --- a/lib/rbot/plugins.rb +++ b/lib/rbot/plugins.rb @@ -410,6 +410,9 @@ module Plugins attr_reader :botmodules attr_reader :maps + attr_reader :core_module_dirs + attr_reader :plugin_dirs + # This is the list of patterns commonly delegated to plugins. # A fast delegation lookup is enabled for them. DEFAULT_DELEGATE_PATTERNS = %r{^(?: diff --git a/lib/rbot/registry.rb b/lib/rbot/registry.rb index 5e905ebb..04892d73 100644 --- a/lib/rbot/registry.rb +++ b/lib/rbot/registry.rb @@ -57,6 +57,15 @@ class Registry @libpath = File.join(File.dirname(__FILE__), 'registry') @format = format load File.join(@libpath, @format+'.rb') if format + # The get_impl method will return all implementations of the + # abstract accessor interface, since we only ever load one + # (the configured one) accessor implementation, we can just assume + # it to be the correct accessor to use. + accessors = AbstractAccessor.get_impl + if accessors.length > 1 + warning 'multiple accessor implementations loaded!' + end + @accessor_class = accessors.first end # Returns a list of supported registry database formats. @@ -68,12 +77,7 @@ class Registry # Creates a new Accessor object for the specified database filename. def create(path, filename) - # The get_impl method will return a list of all the classes that - # implement the accessor interface, since we only ever load one - # (the configured one) accessor implementation, we can just assume - # it to be the correct accessor to use. - cls = AbstractAccessor.get_impl.first - db = cls.new(File.join(path, 'registry_' + @format, filename.downcase)) + db = @accessor_class.new(File.join(path, 'registry_' + @format, filename.downcase)) db.optimize db end @@ -330,7 +334,7 @@ class Registry # Returns all classes from the namespace that implement this interface def self.get_impl - ObjectSpace.each_object(Class).select { |klass| klass < self } + ObjectSpace.each_object(Class).select { |klass| klass.ancestors[1] == self } end end diff --git a/rbot.gemspec b/rbot.gemspec index 29d88c1f..d023609d 100644 --- a/rbot.gemspec +++ b/rbot.gemspec @@ -1,3 +1,5 @@ +require 'rake' + Gem::Specification.new do |s| s.name = 'rbot' s.version = '0.9.15' @@ -17,13 +19,13 @@ Gem::Specification.new do |s| 'COPYING', 'COPYING.rbot', 'GPLv2', - 'README.rdoc', + 'README.md', 'REQUIREMENTS', 'TODO', 'ChangeLog', 'INSTALL', 'Usage_en.txt', - 'man/rbot.1', + 'man/rbot.xml', 'setup.rb', 'launch_here.rb', 'po/*.pot', diff --git a/test/test_journal.rb b/test/test_journal.rb new file mode 100644 index 00000000..f1653c16 --- /dev/null +++ b/test/test_journal.rb @@ -0,0 +1,421 @@ +$:.unshift File.join(File.dirname(__FILE__), '../lib') + +require 'test/unit' +require 'rbot/ircbot' +require 'rbot/journal' +require 'rbot/journal/postgres.rb' +require 'rbot/journal/mongo.rb' + +require 'benchmark' + +DAY=60*60*24 + +class JournalMessageTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_get + m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}}) + assert_equal(42, m.get('bar')) + assert_raise ArgumentError do + m.get('nope') + end + assert_nil(m.get('nope', nil)) + assert_nil(m.get('baz')) + assert_equal(23, m['qux.quxx']) + assert_equal(nil, m['qux.nope']) + assert_raise(ArgumentError) { m.get('qux.nope') } + end + +end + +class QueryTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_define + + q = Query.define do + id 'foo' + id 'bar', 'baz' + topic 'log.irc.*' + topic 'log.core', 'baz' + timestamp from: Time.now, to: Time.now + 60 * 10 + payload 'action': :privmsg, 'alice': 'bob' + payload 'channel': '#rbot' + payload 'foo.bar': 'baz' + end + assert_equal(['foo', 'bar', 'baz'], q.id) + assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic) + assert_equal([:from, :to], q.timestamp.keys) + assert_equal(Time, q.timestamp[:to].class) + assert_equal(Time, q.timestamp[:from].class) + assert_equal({ + 'action': :privmsg, 'alice': 'bob', + 'channel': '#rbot', + 'foo.bar': 'baz' + }, q.payload) + + end + + def test_topic_matches + q = Query.define do + topic 'foo' + end + assert_true(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_false(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo.bar' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo.*' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + assert_true(q.topic_matches?('foo.baz')) + + q = Query.define do + topic '*.bar' + end + assert_false(q.topic_matches?('foo')) + assert_false(q.topic_matches?('bar')) + assert_true(q.topic_matches?('foo.bar')) + assert_true(q.topic_matches?('bar.bar')) + assert_false(q.topic_matches?('foo.foo')) + + q = Query.define do + topic '*.*' + end + assert_false(q.topic_matches?('foo')) + assert_true(q.topic_matches?('foo.bar')) + + q = Query.define do + topic 'foo' + topic 'bar' + topic 'baz.alice.bob.*.foo' + end + assert_true(q.topic_matches?('foo')) + assert_true(q.topic_matches?('bar')) + assert_true(q.topic_matches?('baz.alice.bob.asdf.foo')) + assert_false(q.topic_matches?('baz.alice.bob..foo')) + + end + def test_matches + q = Query.define do + #id 'foo', 'bar' + topic 'log.irc.*', 'log.core' + timestamp from: Time.now - DAY, to: Time.now + DAY + payload 'action': 'privmsg', 'foo.bar': 'baz' + end + assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'})) + assert_false(q.matches? JournalMessage.create('baz', {})) + assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}})) + + # tests timestamp from/to: + assert_true(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now, + payload: {action: 'privmsg'})) + assert_false(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now - DAY*3, + payload: {action: 'privmsg'})) + assert_false(q.matches? JournalMessage.new( + id: 'foo', + topic: 'log.core', + timestamp: Time.now + DAY*3, + payload: {action: 'privmsg'})) + end + +end + +class JournalBrokerTest < Test::Unit::TestCase + + include Irc::Bot::Journal + + def test_publish + received = [] + journal = JournalBroker.new(consumer: Proc.new { |message| + received << message + }) + + # publish some messages: + journal.publish 'log.irc', + source: 'alice', message: '<3 pg' + journal.publish 'log.irc', + source: 'bob', message: 'mysql > pg' + journal.publish 'log.irc', + source: 'alice', target: 'bob', action: :kick + + # wait for messages to be consumed: + sleep 0.1 + assert_equal(3, received.length) + end + + def test_subscribe + received = [] + journal = JournalBroker.new + + # subscribe to messages for topic foo: + sub = journal.subscribe('foo') do |message| + received << message + end + + # publish some messages: + journal.publish 'foo', {} + journal.publish 'bar', {} + journal.publish 'foo', {} + + # wait for messages to be consumed: + sleep 0.1 + assert_equal(2, received.length) + + received.clear + + journal.publish 'foo', {} + sleep 0.1 + sub.cancel + journal.publish 'foo', {} + sleep 0.1 + assert_equal(1, received.length) + end + +end + +module JournalStorageTestMixin + + include Irc::Bot::Journal + + def teardown + @storage.drop + end + + def test_operations + # insertion + m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}}) + @storage.insert(m) + + # query by id + res = @storage.find(Query.define { id m.id }) + assert_equal(1, res.length) + assert_equal(m, res.first) + + # check timestamp was returned correctly: + assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'), + res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z')) + + # check if payload was returned correctly: + assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload) + + # query by topic + assert_equal(m, @storage.find(Query.define { topic('log.core') }).first) + assert_equal(m, @storage.find(Query.define { topic('log.*') }).first) + assert_equal(m, @storage.find(Query.define { topic('*.*') }).first) + + # query by timestamp range + assert_equal(1, @storage.find(Query.define { + timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length) + assert_equal(0, @storage.find(Query.define { + timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length) + + # query by payload + res = @storage.find(Query.define { payload('foo.bar' => 'baz') }) + assert_equal(m, res.first) + res = @storage.find(Query.define { payload('foo.bar' => 'x') }) + assert_true(res.empty?) + + # without arguments: find and count + assert_equal(1, @storage.count) + assert_equal(m, @storage.find.first) + end + + def test_find + # tests limit/offset and block parameters of find() + @storage.insert(JournalMessage.create('irclogs', {message: 'foo'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'bar'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'baz'})) + @storage.insert(JournalMessage.create('irclogs', {message: 'qux'})) + + msgs = [] + @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m| + msgs << m + end + assert_equal(2, msgs.length) + assert_equal('bar', msgs.first['message']) + assert_equal('baz', msgs.last['message']) + + msgs = [] + @storage.find(Query.define({topic: 'irclogs'})) do |m| + msgs << m + end + assert_equal(4, msgs.length) + assert_equal('foo', msgs.first['message']) + assert_equal('qux', msgs.last['message']) + + end + + def test_operations_multiple + # test operations on multiple messages + # insert a bunch: + @storage.insert(JournalMessage.create('test.topic', {name: 'one'})) + @storage.insert(JournalMessage.create('test.topic', {name: 'two'})) + @storage.insert(JournalMessage.create('test.topic', {name: 'three'})) + @storage.insert(JournalMessage.create('archived.topic', {name: 'four'}, + timestamp: Time.now - DAY*100)) + @storage.insert(JournalMessage.create('complex', {name: 'five', country: { + name: 'Italy' + }})) + @storage.insert(JournalMessage.create('complex', {name: 'six', country: { + name: 'Austria' + }})) + + # query by topic + assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length) + # query by payload + assert_equal(1, @storage.find(Query.define { + payload('country.name' => 'Austria') }).length) + # query by timestamp range + assert_equal(1, @storage.find(Query.define { + timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length) + + # count with query + assert_equal(2, @storage.count(Query.define { topic('complex') })) + assert_equal(6, @storage.count) + @storage.remove(Query.define { topic('archived.*') }) + assert_equal(5, @storage.count) + @storage.remove + assert_equal(0, @storage.count) + end + + def test_broker_interface + journal = JournalBroker.new(storage: @storage) + + journal.publish 'irclogs', message: 'foo' + journal.publish 'irclogs', message: 'bar' + journal.publish 'irclogs', message: 'baz' + journal.publish 'irclogs', message: 'qux' + + # wait for messages to be consumed: + sleep 0.1 + + msgs = [] + journal.find({topic: 'irclogs'}, 2, 1) do |m| + msgs << m + end + assert_equal(2, msgs.length) + assert_equal('bar', msgs.first['message']) + assert_equal('baz', msgs.last['message']) + + journal.ensure_payload_index('foo.bar.baz') + end + + NUM=100 # 1_000_000 + def test_benchmark + puts + + assert_equal(0, @storage.count) + # prepare messages to insert, we benchmark the storage backend not ruby + num = 0 + messages = (0...NUM).map do + num += 1 + JournalMessage.create( + 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}}) + end + + # iter is the number of operations performed WITHIN block + def benchmark(label, iter, &block) + time = Benchmark.realtime do + yield + end + puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter] + end + + benchmark(@storage.class.to_s+'~insert', messages.length) do + messages.each { |m| + @storage.insert(m) + } + end + + benchmark(@storage.class.to_s+'~find_by_id', messages.length) do + messages.each { |m| + @storage.find(Query.define { id m.id }) + } + end + benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do + messages.each { |m| + @storage.find(Query.define { topic m.topic }) + } + end + benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do + messages.each { |m| + @storage.find(Query.define { topic m.topic.gsub('topic', '*') }) + } + end + end + +end + +if ENV['PG_URI'] +class JournalStoragePostgresTest < Test::Unit::TestCase + + include JournalStorageTestMixin + + def setup + @storage = Storage::PostgresStorage.new( + uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal', + drop: true) + end + + def test_query_to_sql + q = Query.define do + id 'foo' + id 'bar', 'baz' + topic 'log.irc.*' + topic 'log.core', 'baz' + timestamp from: Time.now, to: Time.now + 60 * 10 + payload 'action': :privmsg, 'alice': 'bob' + payload 'channel': '#rbot' + payload 'foo.bar': 'baz' + end + sql = @storage.query_to_sql(q) + assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0]) + q = Query.define do + id 'foo' + end + assert_equal('(id = $1)', @storage.query_to_sql(q)[0]) + q = Query.define do + topic 'foo.*.bar' + end + assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0]) + assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1]) + end + +end +else + puts 'NOTE: Set PG_URI environment variable to test postgresql storage.' +end + +if ENV['MONGO_URI'] +class JournalStorageMongoTest < Test::Unit::TestCase + + include JournalStorageTestMixin + + def setup + @storage = Storage::MongoStorage.new( + uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot', + drop: true) + end +end +else + puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.' +end +