summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/rbot/core/journal.rb69
-rw-r--r--lib/rbot/core/journal_irclog.rb71
-rw-r--r--lib/rbot/core/webservice.rb32
-rw-r--r--lib/rbot/ircbot.rb1
-rw-r--r--lib/rbot/journal.rb430
-rw-r--r--lib/rbot/journal/mongo.rb131
-rw-r--r--lib/rbot/journal/postgres.rb280
-rw-r--r--lib/rbot/plugins.rb3
-rw-r--r--lib/rbot/registry.rb18
9 files changed, 1027 insertions, 8 deletions
diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb
new file mode 100644
index 00000000..c4f5f0f4
--- /dev/null
+++ b/lib/rbot/core/journal.rb
@@ -0,0 +1,69 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot journal management from IRC
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'rbot/journal'
+
+module ::Irc
+class Bot
+ # this should return the journal if the managing plugin has been loaded.
+ def journal
+ if @plugins['journal']
+ @plugins['journal'].broker
+ end
+ end
+end
+end
+
+class JournalModule < CoreBotModule
+
+ attr_reader :broker
+
+ include Irc::Bot::Journal
+
+ Config.register Config::StringValue.new('journal.storage',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage engine used by the journal')
+ Config.register Config::StringValue.new('journal.storage.uri',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage database uri')
+
+ def initialize
+ super
+ storage = nil
+ name = @bot.config['journal.storage']
+ uri = @bot.config['journal.storage.uri']
+ if name
+ begin
+ storage = Storage.create(name, uri)
+ rescue
+ error 'journal storage initialization error!'
+ error $!
+ error $@.join("\n")
+ end
+ end
+ debug 'journal broker starting up...'
+ @broker = JournalBroker.new(storage: storage)
+ end
+
+ def cleanup
+ super
+ debug 'journal broker shutting down...'
+ @broker.shutdown
+ @broker = nil
+ end
+
+ def help(plugin, topic='')
+ 'journal'
+ end
+
+end
+
+journal = JournalModule.new
+journal.priority = -2
+
diff --git a/lib/rbot/core/journal_irclog.rb b/lib/rbot/core/journal_irclog.rb
new file mode 100644
index 00000000..0617d991
--- /dev/null
+++ b/lib/rbot/core/journal_irclog.rb
@@ -0,0 +1,71 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: irc logging into the journal
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+class JournalIrcLogModule < CoreBotModule
+
+ include Irc::Bot::Journal
+
+ Config.register Config::ArrayValue.new('journal.irclog.whitelist',
+ :default => [],
+ :desc => 'only perform journal irc logging for those channel/users')
+
+ Config.register Config::ArrayValue.new('journal.irclog.blacklist',
+ :default => [],
+ :desc => 'exclude journal irc logging for those channel/users')
+
+ def publish(payload)
+ if payload[:target]
+ target = payload[:target]
+ whitelist = @bot.config['journal.irclog.whitelist']
+ blacklist = @bot.config['journal.irclog.blacklist']
+ unless whitelist.empty?
+ return unless whitelist.include? target
+ end
+ unless blacklist.empty?
+ return if blacklist.include? target
+ end
+ end
+ @bot.journal.publish('irclog', payload)
+ end
+
+ def log_message(m)
+ unless m.kind_of? BasicUserMessage
+ warning 'journal irc logger can\'t log %s message' % [m.class.to_s]
+ else
+ payload = {
+ type: m.class.name.downcase.match(/(\w+)message/).captures.first,
+ addressed: m.address?,
+ replied: m.replied?,
+ identified: m.identified?,
+
+ source: m.source.to_s,
+ source_user: m.botuser.to_s,
+ source_address: m.sourceaddress,
+ target: m.target.to_s,
+ server: m.server.to_s,
+
+ message: m.logmessage,
+ }
+ publish(payload)
+ end
+ end
+
+ # messages sent
+ def sent(m)
+ log_message(m)
+ end
+
+ # messages received
+ def listen(m)
+ log_message(m)
+ end
+end
+
+plugin = JournalIrcLogModule.new
+# make sure the logger gets loaded after the journal
+plugin.priority = -1
+
diff --git a/lib/rbot/core/webservice.rb b/lib/rbot/core/webservice.rb
index 0ddbc2d5..eb01226c 100644
--- a/lib/rbot/core/webservice.rb
+++ b/lib/rbot/core/webservice.rb
@@ -17,6 +17,8 @@ require 'webrick/https'
require 'openssl'
require 'cgi'
require 'json'
+require 'erb'
+require 'ostruct'
module ::Irc
class Bot
@@ -82,7 +84,10 @@ class Bot
}
@path = req.path
- debug '@path = ' + @path.inspect
+
+ @load_path = [File.join(Config::datadir, 'web')]
+ @load_path += @bot.plugins.core_module_dirs
+ @load_path += @bot.plugins.plugin_dirs
end
def parse_query(query)
@@ -124,6 +129,27 @@ class Bot
def send_html(body, status=200)
send_response(body, status, 'text/html')
end
+
+ # Expands a relative filename to absolute using a list of load paths.
+ def get_load_path(filename)
+ @load_path.each do |path|
+ file = File.join(path, filename)
+ return file if File.exists?(file)
+ end
+ end
+
+ # Renders a erb template and responds it
+ def render(template, args={})
+ file = get_load_path template
+ if not file
+ raise 'template not found: ' + template
+ end
+
+ tmpl = ERB.new(IO.read(file))
+ ns = OpenStruct.new(args)
+ body = tmpl.result(ns.instance_eval { binding })
+ send_html(body, 200)
+ end
end
# works similar to a message mapper but for url paths
@@ -394,6 +420,10 @@ class WebServiceModule < CoreBotModule
:requires_rescan => true,
:desc => 'Host the web service will bind on')
+ Config.register Config::StringValue.new('webservice.url',
+ :default => 'http://127.0.0.1:7268',
+ :desc => 'The public URL of the web service.')
+
Config.register Config::BooleanValue.new('webservice.ssl',
:default => false,
:requires_rescan => true,
diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb
index eb158c63..739aaade 100644
--- a/lib/rbot/ircbot.rb
+++ b/lib/rbot/ircbot.rb
@@ -201,6 +201,7 @@ class Bot
# loads and opens new registry databases, used by the plugins
attr_accessor :registry_factory
+ # web service
attr_accessor :webservice
# server we are connected to
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
new file mode 100644
index 00000000..981ff6e4
--- /dev/null
+++ b/lib/rbot/journal.rb
@@ -0,0 +1,430 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'thread'
+require 'securerandom'
+
+module Irc
+class Bot
+module Journal
+
+=begin rdoc
+
+ The journal is a persistent message queue for rbot, its based on a basic
+ publish/subscribe model and persists messages into backend databases
+ that can be efficiently searched for past messages.
+
+ It is a addition to the key value storage already present in rbot
+ through its registry subsystem.
+
+=end
+
+ class InvalidJournalMessage < StandardError
+ end
+ class StorageError < StandardError
+ end
+
+ class JournalMessage
+ # a unique identification of this message
+ attr_reader :id
+
+ # describes a hierarchical queue into which this message belongs
+ attr_reader :topic
+
+ # when this message was published as a Time instance
+ attr_reader :timestamp
+
+ # contains the actual message as a Hash
+ attr_reader :payload
+
+ def initialize(message)
+ @id = message[:id]
+ @timestamp = message[:timestamp]
+ @topic = message[:topic]
+ @payload = message[:payload]
+ if @payload.class != Hash
+ raise InvalidJournalMessage.new('payload must be a hash!')
+ end
+ end
+
+ # Access payload value by key.
+ def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
+ value = pkey.split('.').reduce(@payload) do |hash, key|
+ if hash.has_key?(key) or hash.has_key?(key.to_sym)
+ hash[key] || hash[key.to_sym]
+ else
+ if default == :exception
+ raise ArgumentError.new
+ else
+ default
+ end
+ end
+ end
+ end
+
+ # Access payload value by key alias for get(key, nil).
+ def [](key)
+ get(key, nil)
+ end
+
+ def ==(other)
+ (@id == other.id) rescue false
+ end
+
+ def self.create(topic, payload, opt={})
+ # cleanup payload to only contain strings
+ JournalMessage.new(
+ id: opt[:id] || SecureRandom.uuid,
+ timestamp: opt[:timestamp] || Time.now,
+ topic: topic,
+ payload: payload
+ )
+ end
+ end
+
+ module Storage
+ class AbstractStorage
+ # intializes/opens a new storage connection
+ def initialize(opts={})
+ end
+
+ # inserts a message in storage
+ def insert(message)
+ end
+
+ # creates/ensures a index exists on the payload specified by key
+ def ensure_payload_index(key)
+ end
+
+ # returns a array of message instances that match the query
+ def find(query=nil, limit=100, offset=0, &block)
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ end
+
+ # remove messages that match the query
+ def remove(query=nil)
+ end
+
+ # destroy the underlying table/collection
+ def drop
+ end
+
+ # Returns all classes from the namespace that implement this interface
+ def self.get_impl
+ ObjectSpace.each_object(Class).select { |klass| klass < self }
+ end
+ end
+
+ def self.create(name, uri)
+ log 'load journal storage adapter: ' + name
+ load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
+ cls = AbstractStorage.get_impl.first
+ cls.new(uri: uri)
+ end
+ end
+
+ # Describes a query on journal entries, it is used both to describe
+ # a subscription aswell as to query persisted messages.
+ # There two ways to declare a Query instance, using
+ # the DSL like this:
+ #
+ # Query.define do
+ # id 'foo'
+ # id 'bar'
+ # topic 'log.irc.*'
+ # topic 'log.core'
+ # timestamp from: Time.now, to: Time.now + 60 * 10
+ # payload 'action': :privmsg
+ # payload 'channel': '#rbot'
+ # payload 'foo.bar': 'baz'
+ # end
+ #
+ # or using a hash: (NOTE: avoid using symbols in payload)
+ #
+ # Query.define({
+ # id: ['foo', 'bar'],
+ # topic: ['log.irc.*', 'log.core'],
+ # timestamp: {
+ # from: Time.now
+ # to: Time.now + 60 * 10
+ # },
+ # payload: {
+ # 'action' => 'privmsg'
+ # 'channel' => '#rbot',
+ # 'foo.bar' => 'baz'
+ # }
+ # })
+ #
+ class Query
+ # array of ids to match (OR)
+ attr_reader :id
+ # array of topics to match with wildcard support (OR)
+ attr_reader :topic
+ # hash with from: timestamp and to: timestamp
+ attr_reader :timestamp
+ # hash of key values to match
+ attr_reader :payload
+
+ def initialize(query)
+ @id = query[:id] || []
+ @id = [@id] if @id.is_a? String
+ @topic = query[:topic] || []
+ @topic = [@topic] if @topic.is_a? String
+ @timestamp = {
+ from: nil, to: nil
+ }
+ if query[:timestamp] and query[:timestamp][:from]
+ @timestamp[:from] = query[:timestamp][:from]
+ end
+ if query[:timestamp] and query[:timestamp][:to]
+ @timestamp[:to] = query[:timestamp][:to]
+ end
+ @payload = query[:payload] || {}
+ end
+
+ # returns true if the given message matches the query
+ def matches?(message)
+ return false if not @id.empty? and not @id.include? message.id
+ return false if not @topic.empty? and not topic_matches? message.topic
+ if @timestamp[:from]
+ return false unless message.timestamp >= @timestamp[:from]
+ end
+ if @timestamp[:to]
+ return false unless message.timestamp <= @timestamp[:to]
+ end
+ found = false
+ @payload.each_pair do |key, value|
+ begin
+ message.get(key.to_s)
+ rescue ArgumentError
+ end
+ found = true
+ end
+ return false if not found and not @payload.empty?
+ true
+ end
+
+ def topic_matches?(_topic)
+ @topic.each do |topic|
+ if topic.include? '*'
+ match = true
+ topic.split('.').zip(_topic.split('.')).each do |a, b|
+ if a == '*'
+ if not b or b.empty?
+ match = false
+ end
+ else
+ match = false unless a == b
+ end
+ end
+ return true if match
+ else
+ return true if topic == _topic
+ end
+ end
+ return false
+ end
+
+ # factory that constructs a query
+ class Factory
+ attr_reader :query
+ def initialize
+ @query = {
+ id: [],
+ topic: [],
+ timestamp: {
+ from: nil, to: nil
+ },
+ payload: {}
+ }
+ end
+
+ def id(*_id)
+ @query[:id] += _id
+ end
+
+ def topic(*_topic)
+ @query[:topic] += _topic
+ end
+
+ def timestamp(range)
+ @query[:timestamp] = range
+ end
+
+ def payload(query)
+ @query[:payload].merge!(query)
+ end
+ end
+
+ def self.define(query=nil, &block)
+ factory = Factory.new
+ if block_given?
+ factory.instance_eval(&block)
+ query = factory.query
+ end
+ Query.new query
+ end
+
+ end
+
+
+ class JournalBroker
+ attr_reader :storage
+ class Subscription
+ attr_reader :topic
+ attr_reader :block
+ def initialize(broker, topic, block)
+ @broker = broker
+ @topic = topic
+ @block = block
+ end
+ def cancel
+ @broker.unsubscribe(self)
+ end
+ end
+
+ def initialize(opts={})
+ # overrides the internal consumer with a block
+ @consumer = opts[:consumer]
+ # storage backend
+ @storage = opts[:storage]
+ unless @storage
+ warning 'journal broker: no storage set up, won\'t persist messages'
+ end
+ @queue = Queue.new
+ # consumer thread:
+ @thread = Thread.new do
+ while message = @queue.pop
+ begin
+ consume message
+ # pop(true) ... rescue ThreadError => e
+ rescue Exception => e
+ error 'journal broker: exception in consumer thread'
+ error $!
+ end
+ end
+ end
+ @subscriptions = []
+ # lookup-table for subscriptions by their topic
+ @topic_subs = {}
+ end
+
+ def consume(message)
+ return unless message
+ @consumer.call(message) if @consumer
+
+ # notify subscribers
+ if @topic_subs.has_key? message.topic
+ @topic_subs[message.topic].each do |s|
+ s.block.call(message)
+ end
+ end
+
+ @storage.insert(message) if @storage
+ end
+
+ def persists?
+ true if @storage
+ end
+
+ def shutdown
+ log 'journal shutdown'
+ @subscriptions.clear
+ @topic_subs.clear
+ @queue << nil
+ @thread.join
+ @thread = nil
+ end
+
+ def publish(topic, payload)
+ debug 'journal publish message in %s: %s' % [topic, payload.inspect]
+ @queue << JournalMessage::create(topic, payload)
+ nil
+ end
+
+ # Subscribe to receive messages from a topic.
+ #
+ # You can use this method to subscribe to messages that
+ # are published within a specified topic. You must provide
+ # a receiving block to receive messages one-by-one.
+ # The method returns an instance of Subscription that can
+ # be used to cancel the subscription by invoking cancel
+ # on it.
+ #
+ # journal.subscribe('irclog') do |message|
+ # # received irclog messages...
+ # end
+ #
+ def subscribe(topic=nil, &block)
+ raise ArgumentError.new unless block_given?
+ s = Subscription.new(self, topic, block)
+ @subscriptions << s
+ unless @topic_subs.has_key? topic
+ @topic_subs[topic] = []
+ end
+ @topic_subs[topic] << s
+ s
+ end
+
+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
+ end
+
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ if block_given?
+ @storage.find(query, limit, offset, &block)
+ else
+ @storage.find(query, limit, offset)
+ end
+ end
+
+ def count(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.count(query)
+ end
+
+ def remove(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.remove(query)
+ end
+
+ def ensure_payload_index(key)
+ @storage.ensure_payload_index(key)
+ end
+
+ end
+
+end # Journal
+end # Bot
+end # Irc
+
diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
new file mode 100644
index 00000000..a03355f9
--- /dev/null
+++ b/lib/rbot/journal/mongo.rb
@@ -0,0 +1,131 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for mongoDB
+
+require 'mongo'
+require 'json'
+
+module Irc
+class Bot
+module Journal
+
+ module Storage
+
+ class MongoStorage < AbstractStorage
+ attr_reader :client
+
+ def initialize(opts={})
+ Mongo::Logger.logger.level = Logger::WARN
+ @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
+ @client = Mongo::Client.new(@uri)
+ @collection = @client['journal']
+ log 'journal storage: mongodb connected to ' + @uri
+
+ drop if opts[:drop]
+ @collection.indexes.create_one({topic: 1})
+ @collection.indexes.create_one({timestamp: 1})
+ end
+
+ def ensure_payload_index(key)
+ @collection.indexes.create_one({'payload.'+key => 1})
+ end
+
+ def insert(m)
+ @collection.insert_one({
+ '_id' => m.id,
+ 'topic' => m.topic,
+ 'timestamp' => m.timestamp,
+ 'payload' => m.payload
+ })
+ end
+
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(document)
+ JournalMessage.new(id: document['_id'],
+ timestamp: document['timestamp'].localtime,
+ topic: document['topic'],
+ payload: document['payload'].to_h)
+ end
+
+ cursor = query_cursor(query).skip(offset).limit(limit)
+
+ if block_given?
+ cursor.each { |document| block.call(to_message(document)) }
+ else
+ cursor.map { |document| to_message(document) }
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ query_cursor(query).count
+ end
+
+ def remove(query=nil)
+ query_cursor(query).delete_many
+ end
+
+ def drop
+ @collection.drop
+ end
+
+ def query_cursor(query)
+ unless query
+ return @collection.find()
+ end
+
+ query_and = []
+
+ # ID query OR condition
+ unless query.id.empty?
+ query_and << {
+ '$or' => query.id.map { |_id|
+ {'_id' => _id}
+ }
+ }
+ end
+
+ unless query.topic.empty?
+ query_and << {
+ '$or' => query.topic.map { |topic|
+ if topic.include?('*')
+ pattern = topic.gsub('.', '\.').gsub('*', '.*')
+ {'topic' => {'$regex' => pattern}}
+ else
+ {'topic' => topic}
+ end
+ }
+ }
+ end
+
+ if query.timestamp[:from] or query.timestamp[:to]
+ where = {}
+ if query.timestamp[:from]
+ where['$gte'] = query.timestamp[:from]
+ end
+ if query.timestamp[:to]
+ where['$lte'] = query.timestamp[:to]
+ end
+ query_and << {'timestamp' => where}
+ end
+
+ unless query.payload.empty?
+ query_and << {
+ '$or' => query.payload.map { |key, value|
+ key = 'payload.' + key
+ {key => value}
+ }
+ }
+ end
+
+ @collection.find({
+ '$and' => query_and
+ })
+ end
+ end
+ end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
new file mode 100644
index 00000000..7b333158
--- /dev/null
+++ b/lib/rbot/journal/postgres.rb
@@ -0,0 +1,280 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for postgresql
+
+require 'pg'
+require 'json'
+
+# wraps the postgres driver in a single thread
+class PGWrapper
+ def initialize(uri)
+ @uri = uri
+ @queue = Queue.new
+ run_thread
+ end
+
+ def run_thread
+ Thread.new do
+ @conn = PG.connect(@uri)
+ while message = @queue.pop
+ return_queue = message.shift
+ begin
+ result = @conn.send(*message)
+ return_queue << [:result, result]
+ rescue Exception => e
+ return_queue << [:exception, e]
+ end
+ end
+ @conn.finish
+ end
+ end
+
+ def run_in_thread(*args)
+ rq = Queue.new
+ @queue << [rq, *args]
+ type, result = rq.pop
+ if type == :exception
+ raise result
+ else
+ result
+ end
+ end
+
+ public
+
+ def destroy
+ @queue << nil
+ end
+
+ def exec(query)
+ run_in_thread(:exec, query)
+ end
+
+ def exec_params(query, params)
+ run_in_thread(:exec_params, query, params)
+ end
+
+ def escape_string(string)
+ @conn.escape_string(string)
+ end
+end
+
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
+declare
+ l_count integer;
+begin
+ select count(*)
+ into l_count
+ from pg_indexes
+ where schemaname = 'public'
+ and tablename = lower(table_name)
+ and indexname = lower(index_name);
+
+ if l_count = 0 then
+ execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+ end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
+
+module Irc
+class Bot
+module Journal
+
+ module Storage
+
+ class PostgresStorage < AbstractStorage
+ attr_reader :conn
+
+ def initialize(opts={})
+ @uri = opts[:uri] || 'postgresql://localhost/rbot'
+ @conn = PGWrapper.new(@uri)
+ @conn.exec('set client_min_messages = warning')
+ @conn.exec(CREATE_INDEX)
+ @version = @conn.exec('SHOW server_version;')[0]['server_version']
+
+ @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
+ log 'journal storage: postgresql connected to version: ' + @version
+
+ version = @version.split('.')[0,3].join.to_i
+ if version < 930
+ raise StorageError.new(
+ 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
+ end
+ @jsonb = (version >= 940)
+ log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+ log 'journal storage: postgres backend is using JSONB :)' if @jsonb
+
+ drop if opts[:drop]
+ create_table
+ create_index('topic_index', 'topic')
+ create_index('timestamp_index', 'timestamp')
+ end
+
+ def create_table
+ @conn.exec('
+ CREATE TABLE IF NOT EXISTS journal
+ (id UUID PRIMARY KEY,
+ topic TEXT NOT NULL,
+ timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+ payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ end
+
+ def create_index(index_name, column_name)
+ debug 'journal postges backend: create index %s for %s' % [
+ index_name, column_name]
+ @conn.exec_params('SELECT create_index($1, $2, $3)', [
+ 'journal', index_name, column_name])
+ end
+
+ def create_payload_index(key)
+ index_name = 'idx_payload_' + key.gsub('.', '_')
+ column = sql_payload_selector(key)
+ create_index(index_name, column)
+ end
+
+ def ensure_index(key)
+ create_payload_index(key)
+ end
+
+ def insert(m)
+ @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+ [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+ end
+
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(row)
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ else
+ sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ if block_given?
+ res.each { |row| block.call(to_message(row)) }
+ else
+ res.map { |row| to_message(row) }
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ else
+ sql = 'SELECT COUNT(*) FROM journal'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ res[0]['count'].to_i
+ end
+
+ def remove(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'DELETE FROM journal WHERE ' + sql
+ else
+ sql = 'DELETE FROM journal;'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ end
+
+ def drop
+ @conn.exec('DROP TABLE journal;') rescue nil
+ end
+
+ def sql_payload_selector(key)
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ selector
+ end
+
+ def query_to_sql(query)
+ params = []
+ placeholder = Proc.new do |value|
+ params << value
+ '$%d' % [params.length]
+ end
+ sql = {op: 'AND', list: []}
+
+ # ID query OR condition
+ unless query.id.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.id.map { |id|
+ 'id = ' + placeholder.call(id)
+ }
+ }
+ end
+
+ # Topic query OR condition
+ unless query.topic.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.topic.map { |topic|
+ 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+ }
+ }
+ end
+
+ # Timestamp range query AND condition
+ if query.timestamp[:from] or query.timestamp[:to]
+ list = []
+ if query.timestamp[:from]
+ list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+ end
+ if query.timestamp[:to]
+ list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+ end
+ sql[:list] << {
+ op: 'AND',
+ list: list
+ }
+ end
+
+ # Payload query
+ unless query.payload.empty?
+ list = []
+ query.payload.each_pair do |key, value|
+ selector = sql_payload_selector(key)
+ list << selector + ' = ' + placeholder.call(value)
+ end
+ sql[:list] << {
+ op: 'OR',
+ list: list
+ }
+ end
+
+ sql = sql[:list].map { |stmt|
+ '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+ }.join(' %s ' % [sql[:op]])
+
+ [sql, params]
+ end
+ end
+ end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/plugins.rb b/lib/rbot/plugins.rb
index 0e7e9d6f..8621fe45 100644
--- a/lib/rbot/plugins.rb
+++ b/lib/rbot/plugins.rb
@@ -410,6 +410,9 @@ module Plugins
attr_reader :botmodules
attr_reader :maps
+ attr_reader :core_module_dirs
+ attr_reader :plugin_dirs
+
# This is the list of patterns commonly delegated to plugins.
# A fast delegation lookup is enabled for them.
DEFAULT_DELEGATE_PATTERNS = %r{^(?:
diff --git a/lib/rbot/registry.rb b/lib/rbot/registry.rb
index 5e905ebb..04892d73 100644
--- a/lib/rbot/registry.rb
+++ b/lib/rbot/registry.rb
@@ -57,6 +57,15 @@ class Registry
@libpath = File.join(File.dirname(__FILE__), 'registry')
@format = format
load File.join(@libpath, @format+'.rb') if format
+ # The get_impl method will return all implementations of the
+ # abstract accessor interface, since we only ever load one
+ # (the configured one) accessor implementation, we can just assume
+ # it to be the correct accessor to use.
+ accessors = AbstractAccessor.get_impl
+ if accessors.length > 1
+ warning 'multiple accessor implementations loaded!'
+ end
+ @accessor_class = accessors.first
end
# Returns a list of supported registry database formats.
@@ -68,12 +77,7 @@ class Registry
# Creates a new Accessor object for the specified database filename.
def create(path, filename)
- # The get_impl method will return a list of all the classes that
- # implement the accessor interface, since we only ever load one
- # (the configured one) accessor implementation, we can just assume
- # it to be the correct accessor to use.
- cls = AbstractAccessor.get_impl.first
- db = cls.new(File.join(path, 'registry_' + @format, filename.downcase))
+ db = @accessor_class.new(File.join(path, 'registry_' + @format, filename.downcase))
db.optimize
db
end
@@ -330,7 +334,7 @@ class Registry
# Returns all classes from the namespace that implement this interface
def self.get_impl
- ObjectSpace.each_object(Class).select { |klass| klass < self }
+ ObjectSpace.each_object(Class).select { |klass| klass.ancestors[1] == self }
end
end