summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--data/rbot/plugins/excuse.rb2
-rw-r--r--data/rbot/plugins/points.rb (renamed from data/rbot/plugins/karma.rb)64
-rw-r--r--lib/rbot/core/journal.rb69
-rw-r--r--lib/rbot/core/journal_irclog.rb71
-rw-r--r--lib/rbot/core/webservice.rb32
-rw-r--r--lib/rbot/ircbot.rb1
-rw-r--r--lib/rbot/journal.rb430
-rw-r--r--lib/rbot/journal/mongo.rb131
-rw-r--r--lib/rbot/journal/postgres.rb280
-rw-r--r--lib/rbot/messagemapper.rb12
-rw-r--r--lib/rbot/plugins.rb25
-rw-r--r--lib/rbot/registry.rb18
-rw-r--r--po/en/rbot-points.po (renamed from po/en/rbot-karma.po)0
-rw-r--r--po/fi/rbot-points.po (renamed from po/fi/rbot-karma.po)0
-rw-r--r--po/fr/rbot-points.po (renamed from po/fr/rbot-karma.po)0
-rw-r--r--po/it/rbot-points.po (renamed from po/it/rbot-karma.po)0
-rw-r--r--po/ja/rbot-points.po (renamed from po/ja/rbot-karma.po)0
-rw-r--r--po/rbot-points.pot (renamed from po/rbot-karma.pot)0
-rw-r--r--po/zh_CN/rbot-points.po (renamed from po/zh_CN/rbot-karma.po)0
-rw-r--r--po/zh_TW/rbot-points.po (renamed from po/zh_TW/rbot-karma.po)0
-rw-r--r--rbot.gemspec6
-rw-r--r--test/test_journal.rb421
22 files changed, 1509 insertions, 53 deletions
diff --git a/data/rbot/plugins/excuse.rb b/data/rbot/plugins/excuse.rb
index ad0e8334..61f38bd6 100644
--- a/data/rbot/plugins/excuse.rb
+++ b/data/rbot/plugins/excuse.rb
@@ -387,7 +387,7 @@ class ExcusePlugin < Plugin
"Dyslexics retyping hosts file on servers",
"The Internet is being scanned for viruses.",
"Your computer's union contract is set to expire at midnight.",
-"Bad user karma.",
+"Bad reputation.",
"/dev/clue was linked to /dev/null",
"Increased sunspot activity.",
"We already sent around a notice about that.",
diff --git a/data/rbot/plugins/karma.rb b/data/rbot/plugins/points.rb
index 93d21189..21157f3d 100644
--- a/data/rbot/plugins/karma.rb
+++ b/data/rbot/plugins/points.rb
@@ -1,4 +1,4 @@
-class KarmaPlugin < Plugin
+class PointsPlugin < Plugin
def initialize
super
@@ -14,50 +14,62 @@ class KarmaPlugin < Plugin
@registry.set_default(0)
# import if old file format found
- oldkarma = @bot.path 'karma.rbot'
- if File.exist? oldkarma
- log "importing old karma data"
- IO.foreach(oldkarma) do |line|
+ oldpoints = @bot.path 'points.rbot'
+ if File.exist? oldpoints
+ log "importing old points data"
+ IO.foreach(oldpoints) do |line|
if(line =~ /^(\S+)<=>([\d-]+)$/)
item = $1
- karma = $2.to_i
- @registry[item] = karma
+ points = $2.to_i
+ @registry[item] = points
end
end
- File.delete oldkarma
+ File.delete oldpoints
end
end
def stats(m, params)
if (@registry.length)
- max = @registry.values.max
- min = @registry.values.min
- best = @registry.to_hash.index(max)
- worst = @registry.to_hash.index(min)
+ max = @registry.values.max || "zero"
+ min = @registry.values.min || "zero"
+ best = @registry.to_hash.key(max) || "nobody"
+ worst = @registry.to_hash.key(min) || "nobody"
m.reply "#{@registry.length} items. Best: #{best} (#{max}); Worst: #{worst} (#{min})"
end
end
- def karma(m, params)
+ def dump(m, params)
+ if (@registry.length)
+ msg = "Points dump: "
+ msg << @registry.to_hash.sort_by { |k, v| v }.reverse.
+ map { |k,v| "#{k}: #{v}" }.
+ join(", ")
+ m.reply msg
+ else
+ m.reply "nobody has any points yet!"
+ end
+ end
+
+ def points(m, params)
thing = params[:key]
thing = m.sourcenick unless thing
thing = thing.to_s
- karma = @registry[thing]
- if(karma != 0)
- m.reply "karma for #{thing}: #{@registry[thing]}"
+ points = @registry[thing]
+ if(points != 0)
+ m.reply "points for #{thing}: #{@registry[thing]}"
else
- m.reply "#{thing} has neutral karma"
+ m.reply "#{thing} has zero points"
end
end
- def setkarma(m, params)
+ def setpoints(m, params)
thing = (params[:key] || m.sourcenick).to_s
@registry[thing] = params[:val].to_i
- karma(m, params)
+ points(m, params)
end
def help(plugin, topic="")
- "karma module: Listens to everyone's chat. <thing>++/<thing>-- => increase/decrease karma for <thing>, karma for <thing>? => show karma for <thing>, karmastats => show stats. Karma is a community rating system - only in-channel messages can affect karma and you cannot adjust your own."
+ "points module: Keeps track of internet points, infusing your pointless life with meaning. Listens to everyone's chat. <thing>++/<thing>-- => increase/decrease points for <thing>, points for <thing>? => show points for <thing>, pointstats => show best/worst, pointsdump => show everyone's points. Points are a community rating system - only in-channel messages can affect points and you cannot adjust your own."
end
def message(m)
@@ -96,15 +108,17 @@ class KarmaPlugin < Plugin
next if v == 0
@registry[k] += (v > 0 ? 1 : -1)
m.reply @bot.lang.get("thanks") if k == @bot.nick && v > 0
+ m.reply "#{k} now has #{@registry[k]} points!"
end
end
end
-plugin = KarmaPlugin.new
+plugin = PointsPlugin.new
plugin.default_auth( 'edit', false )
-plugin.map 'karmastats', :action => 'stats'
-plugin.map 'karma :key', :defaults => {:key => false}
-plugin.map 'setkarma :key :val', :defaults => {:key => false}, :requirements => {:val => /^-?\d+$/}, :auth_path => 'edit::set!'
-plugin.map 'karma for :key'
+plugin.map 'pointstats', :action => 'stats'
+plugin.map 'points :key', :defaults => {:key => false}
+plugin.map 'setpoints :key :val', :defaults => {:key => false}, :requirements => {:val => /^-?\d+$/}, :auth_path => 'edit::set!'
+plugin.map 'points for :key'
+plugin.map 'pointsdump', :action => 'dump'
diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb
new file mode 100644
index 00000000..c4f5f0f4
--- /dev/null
+++ b/lib/rbot/core/journal.rb
@@ -0,0 +1,69 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot journal management from IRC
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'rbot/journal'
+
+module ::Irc
+class Bot
+ # this should return the journal if the managing plugin has been loaded.
+ def journal
+ if @plugins['journal']
+ @plugins['journal'].broker
+ end
+ end
+end
+end
+
+class JournalModule < CoreBotModule
+
+ attr_reader :broker
+
+ include Irc::Bot::Journal
+
+ Config.register Config::StringValue.new('journal.storage',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage engine used by the journal')
+ Config.register Config::StringValue.new('journal.storage.uri',
+ :default => nil,
+ :requires_rescan => true,
+ :desc => 'storage database uri')
+
+ def initialize
+ super
+ storage = nil
+ name = @bot.config['journal.storage']
+ uri = @bot.config['journal.storage.uri']
+ if name
+ begin
+ storage = Storage.create(name, uri)
+ rescue
+ error 'journal storage initialization error!'
+ error $!
+ error $@.join("\n")
+ end
+ end
+ debug 'journal broker starting up...'
+ @broker = JournalBroker.new(storage: storage)
+ end
+
+ def cleanup
+ super
+ debug 'journal broker shutting down...'
+ @broker.shutdown
+ @broker = nil
+ end
+
+ def help(plugin, topic='')
+ 'journal'
+ end
+
+end
+
+journal = JournalModule.new
+journal.priority = -2
+
diff --git a/lib/rbot/core/journal_irclog.rb b/lib/rbot/core/journal_irclog.rb
new file mode 100644
index 00000000..0617d991
--- /dev/null
+++ b/lib/rbot/core/journal_irclog.rb
@@ -0,0 +1,71 @@
+#-- vim:sw=2:et
+#++
+#
+# :title: irc logging into the journal
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+class JournalIrcLogModule < CoreBotModule
+
+ include Irc::Bot::Journal
+
+ Config.register Config::ArrayValue.new('journal.irclog.whitelist',
+ :default => [],
+ :desc => 'only perform journal irc logging for those channel/users')
+
+ Config.register Config::ArrayValue.new('journal.irclog.blacklist',
+ :default => [],
+ :desc => 'exclude journal irc logging for those channel/users')
+
+ def publish(payload)
+ if payload[:target]
+ target = payload[:target]
+ whitelist = @bot.config['journal.irclog.whitelist']
+ blacklist = @bot.config['journal.irclog.blacklist']
+ unless whitelist.empty?
+ return unless whitelist.include? target
+ end
+ unless blacklist.empty?
+ return if blacklist.include? target
+ end
+ end
+ @bot.journal.publish('irclog', payload)
+ end
+
+ def log_message(m)
+ unless m.kind_of? BasicUserMessage
+ warning 'journal irc logger can\'t log %s message' % [m.class.to_s]
+ else
+ payload = {
+ type: m.class.name.downcase.match(/(\w+)message/).captures.first,
+ addressed: m.address?,
+ replied: m.replied?,
+ identified: m.identified?,
+
+ source: m.source.to_s,
+ source_user: m.botuser.to_s,
+ source_address: m.sourceaddress,
+ target: m.target.to_s,
+ server: m.server.to_s,
+
+ message: m.logmessage,
+ }
+ publish(payload)
+ end
+ end
+
+ # messages sent
+ def sent(m)
+ log_message(m)
+ end
+
+ # messages received
+ def listen(m)
+ log_message(m)
+ end
+end
+
+plugin = JournalIrcLogModule.new
+# make sure the logger gets loaded after the journal
+plugin.priority = -1
+
diff --git a/lib/rbot/core/webservice.rb b/lib/rbot/core/webservice.rb
index 0ddbc2d5..eb01226c 100644
--- a/lib/rbot/core/webservice.rb
+++ b/lib/rbot/core/webservice.rb
@@ -17,6 +17,8 @@ require 'webrick/https'
require 'openssl'
require 'cgi'
require 'json'
+require 'erb'
+require 'ostruct'
module ::Irc
class Bot
@@ -82,7 +84,10 @@ class Bot
}
@path = req.path
- debug '@path = ' + @path.inspect
+
+ @load_path = [File.join(Config::datadir, 'web')]
+ @load_path += @bot.plugins.core_module_dirs
+ @load_path += @bot.plugins.plugin_dirs
end
def parse_query(query)
@@ -124,6 +129,27 @@ class Bot
def send_html(body, status=200)
send_response(body, status, 'text/html')
end
+
+ # Expands a relative filename to absolute using a list of load paths.
+ def get_load_path(filename)
+ @load_path.each do |path|
+ file = File.join(path, filename)
+ return file if File.exists?(file)
+ end
+ end
+
+ # Renders a erb template and responds it
+ def render(template, args={})
+ file = get_load_path template
+ if not file
+ raise 'template not found: ' + template
+ end
+
+ tmpl = ERB.new(IO.read(file))
+ ns = OpenStruct.new(args)
+ body = tmpl.result(ns.instance_eval { binding })
+ send_html(body, 200)
+ end
end
# works similar to a message mapper but for url paths
@@ -394,6 +420,10 @@ class WebServiceModule < CoreBotModule
:requires_rescan => true,
:desc => 'Host the web service will bind on')
+ Config.register Config::StringValue.new('webservice.url',
+ :default => 'http://127.0.0.1:7268',
+ :desc => 'The public URL of the web service.')
+
Config.register Config::BooleanValue.new('webservice.ssl',
:default => false,
:requires_rescan => true,
diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb
index eb158c63..739aaade 100644
--- a/lib/rbot/ircbot.rb
+++ b/lib/rbot/ircbot.rb
@@ -201,6 +201,7 @@ class Bot
# loads and opens new registry databases, used by the plugins
attr_accessor :registry_factory
+ # web service
attr_accessor :webservice
# server we are connected to
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
new file mode 100644
index 00000000..981ff6e4
--- /dev/null
+++ b/lib/rbot/journal.rb
@@ -0,0 +1,430 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
+
+require 'thread'
+require 'securerandom'
+
+module Irc
+class Bot
+module Journal
+
+=begin rdoc
+
+ The journal is a persistent message queue for rbot, its based on a basic
+ publish/subscribe model and persists messages into backend databases
+ that can be efficiently searched for past messages.
+
+ It is a addition to the key value storage already present in rbot
+ through its registry subsystem.
+
+=end
+
+ class InvalidJournalMessage < StandardError
+ end
+ class StorageError < StandardError
+ end
+
+ class JournalMessage
+ # a unique identification of this message
+ attr_reader :id
+
+ # describes a hierarchical queue into which this message belongs
+ attr_reader :topic
+
+ # when this message was published as a Time instance
+ attr_reader :timestamp
+
+ # contains the actual message as a Hash
+ attr_reader :payload
+
+ def initialize(message)
+ @id = message[:id]
+ @timestamp = message[:timestamp]
+ @topic = message[:topic]
+ @payload = message[:payload]
+ if @payload.class != Hash
+ raise InvalidJournalMessage.new('payload must be a hash!')
+ end
+ end
+
+ # Access payload value by key.
+ def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
+ value = pkey.split('.').reduce(@payload) do |hash, key|
+ if hash.has_key?(key) or hash.has_key?(key.to_sym)
+ hash[key] || hash[key.to_sym]
+ else
+ if default == :exception
+ raise ArgumentError.new
+ else
+ default
+ end
+ end
+ end
+ end
+
+ # Access payload value by key alias for get(key, nil).
+ def [](key)
+ get(key, nil)
+ end
+
+ def ==(other)
+ (@id == other.id) rescue false
+ end
+
+ def self.create(topic, payload, opt={})
+ # cleanup payload to only contain strings
+ JournalMessage.new(
+ id: opt[:id] || SecureRandom.uuid,
+ timestamp: opt[:timestamp] || Time.now,
+ topic: topic,
+ payload: payload
+ )
+ end
+ end
+
+ module Storage
+ class AbstractStorage
+ # intializes/opens a new storage connection
+ def initialize(opts={})
+ end
+
+ # inserts a message in storage
+ def insert(message)
+ end
+
+ # creates/ensures a index exists on the payload specified by key
+ def ensure_payload_index(key)
+ end
+
+ # returns a array of message instances that match the query
+ def find(query=nil, limit=100, offset=0, &block)
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ end
+
+ # remove messages that match the query
+ def remove(query=nil)
+ end
+
+ # destroy the underlying table/collection
+ def drop
+ end
+
+ # Returns all classes from the namespace that implement this interface
+ def self.get_impl
+ ObjectSpace.each_object(Class).select { |klass| klass < self }
+ end
+ end
+
+ def self.create(name, uri)
+ log 'load journal storage adapter: ' + name
+ load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
+ cls = AbstractStorage.get_impl.first
+ cls.new(uri: uri)
+ end
+ end
+
+ # Describes a query on journal entries, it is used both to describe
+ # a subscription aswell as to query persisted messages.
+ # There two ways to declare a Query instance, using
+ # the DSL like this:
+ #
+ # Query.define do
+ # id 'foo'
+ # id 'bar'
+ # topic 'log.irc.*'
+ # topic 'log.core'
+ # timestamp from: Time.now, to: Time.now + 60 * 10
+ # payload 'action': :privmsg
+ # payload 'channel': '#rbot'
+ # payload 'foo.bar': 'baz'
+ # end
+ #
+ # or using a hash: (NOTE: avoid using symbols in payload)
+ #
+ # Query.define({
+ # id: ['foo', 'bar'],
+ # topic: ['log.irc.*', 'log.core'],
+ # timestamp: {
+ # from: Time.now
+ # to: Time.now + 60 * 10
+ # },
+ # payload: {
+ # 'action' => 'privmsg'
+ # 'channel' => '#rbot',
+ # 'foo.bar' => 'baz'
+ # }
+ # })
+ #
+ class Query
+ # array of ids to match (OR)
+ attr_reader :id
+ # array of topics to match with wildcard support (OR)
+ attr_reader :topic
+ # hash with from: timestamp and to: timestamp
+ attr_reader :timestamp
+ # hash of key values to match
+ attr_reader :payload
+
+ def initialize(query)
+ @id = query[:id] || []
+ @id = [@id] if @id.is_a? String
+ @topic = query[:topic] || []
+ @topic = [@topic] if @topic.is_a? String
+ @timestamp = {
+ from: nil, to: nil
+ }
+ if query[:timestamp] and query[:timestamp][:from]
+ @timestamp[:from] = query[:timestamp][:from]
+ end
+ if query[:timestamp] and query[:timestamp][:to]
+ @timestamp[:to] = query[:timestamp][:to]
+ end
+ @payload = query[:payload] || {}
+ end
+
+ # returns true if the given message matches the query
+ def matches?(message)
+ return false if not @id.empty? and not @id.include? message.id
+ return false if not @topic.empty? and not topic_matches? message.topic
+ if @timestamp[:from]
+ return false unless message.timestamp >= @timestamp[:from]
+ end
+ if @timestamp[:to]
+ return false unless message.timestamp <= @timestamp[:to]
+ end
+ found = false
+ @payload.each_pair do |key, value|
+ begin
+ message.get(key.to_s)
+ rescue ArgumentError
+ end
+ found = true
+ end
+ return false if not found and not @payload.empty?
+ true
+ end
+
+ def topic_matches?(_topic)
+ @topic.each do |topic|
+ if topic.include? '*'
+ match = true
+ topic.split('.').zip(_topic.split('.')).each do |a, b|
+ if a == '*'
+ if not b or b.empty?
+ match = false
+ end
+ else
+ match = false unless a == b
+ end
+ end
+ return true if match
+ else
+ return true if topic == _topic
+ end
+ end
+ return false
+ end
+
+ # factory that constructs a query
+ class Factory
+ attr_reader :query
+ def initialize
+ @query = {
+ id: [],
+ topic: [],
+ timestamp: {
+ from: nil, to: nil
+ },
+ payload: {}
+ }
+ end
+
+ def id(*_id)
+ @query[:id] += _id
+ end
+
+ def topic(*_topic)
+ @query[:topic] += _topic
+ end
+
+ def timestamp(range)
+ @query[:timestamp] = range
+ end
+
+ def payload(query)
+ @query[:payload].merge!(query)
+ end
+ end
+
+ def self.define(query=nil, &block)
+ factory = Factory.new
+ if block_given?
+ factory.instance_eval(&block)
+ query = factory.query
+ end
+ Query.new query
+ end
+
+ end
+
+
+ class JournalBroker
+ attr_reader :storage
+ class Subscription
+ attr_reader :topic
+ attr_reader :block
+ def initialize(broker, topic, block)
+ @broker = broker
+ @topic = topic
+ @block = block
+ end
+ def cancel
+ @broker.unsubscribe(self)
+ end
+ end
+
+ def initialize(opts={})
+ # overrides the internal consumer with a block
+ @consumer = opts[:consumer]
+ # storage backend
+ @storage = opts[:storage]
+ unless @storage
+ warning 'journal broker: no storage set up, won\'t persist messages'
+ end
+ @queue = Queue.new
+ # consumer thread:
+ @thread = Thread.new do
+ while message = @queue.pop
+ begin
+ consume message
+ # pop(true) ... rescue ThreadError => e
+ rescue Exception => e
+ error 'journal broker: exception in consumer thread'
+ error $!
+ end
+ end
+ end
+ @subscriptions = []
+ # lookup-table for subscriptions by their topic
+ @topic_subs = {}
+ end
+
+ def consume(message)
+ return unless message
+ @consumer.call(message) if @consumer
+
+ # notify subscribers
+ if @topic_subs.has_key? message.topic
+ @topic_subs[message.topic].each do |s|
+ s.block.call(message)
+ end
+ end
+
+ @storage.insert(message) if @storage
+ end
+
+ def persists?
+ true if @storage
+ end
+
+ def shutdown
+ log 'journal shutdown'
+ @subscriptions.clear
+ @topic_subs.clear
+ @queue << nil
+ @thread.join
+ @thread = nil
+ end
+
+ def publish(topic, payload)
+ debug 'journal publish message in %s: %s' % [topic, payload.inspect]
+ @queue << JournalMessage::create(topic, payload)
+ nil
+ end
+
+ # Subscribe to receive messages from a topic.
+ #
+ # You can use this method to subscribe to messages that
+ # are published within a specified topic. You must provide
+ # a receiving block to receive messages one-by-one.
+ # The method returns an instance of Subscription that can
+ # be used to cancel the subscription by invoking cancel
+ # on it.
+ #
+ # journal.subscribe('irclog') do |message|
+ # # received irclog messages...
+ # end
+ #
+ def subscribe(topic=nil, &block)
+ raise ArgumentError.new unless block_given?
+ s = Subscription.new(self, topic, block)
+ @subscriptions << s
+ unless @topic_subs.has_key? topic
+ @topic_subs[topic] = []
+ end
+ @topic_subs[topic] << s
+ s
+ end
+
+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
+ end
+
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ if block_given?
+ @storage.find(query, limit, offset, &block)
+ else
+ @storage.find(query, limit, offset)
+ end
+ end
+
+ def count(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.count(query)
+ end
+
+ def remove(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.remove(query)
+ end
+
+ def ensure_payload_index(key)
+ @storage.ensure_payload_index(key)
+ end
+
+ end
+
+end # Journal
+end # Bot
+end # Irc
+
diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
new file mode 100644
index 00000000..a03355f9
--- /dev/null
+++ b/lib/rbot/journal/mongo.rb
@@ -0,0 +1,131 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for mongoDB
+
+require 'mongo'
+require 'json'
+
+module Irc
+class Bot
+module Journal
+
+ module Storage
+
+ class MongoStorage < AbstractStorage
+ attr_reader :client
+
+ def initialize(opts={})
+ Mongo::Logger.logger.level = Logger::WARN
+ @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
+ @client = Mongo::Client.new(@uri)
+ @collection = @client['journal']
+ log 'journal storage: mongodb connected to ' + @uri
+
+ drop if opts[:drop]
+ @collection.indexes.create_one({topic: 1})
+ @collection.indexes.create_one({timestamp: 1})
+ end
+
+ def ensure_payload_index(key)
+ @collection.indexes.create_one({'payload.'+key => 1})
+ end
+
+ def insert(m)
+ @collection.insert_one({
+ '_id' => m.id,
+ 'topic' => m.topic,
+ 'timestamp' => m.timestamp,
+ 'payload' => m.payload
+ })
+ end
+
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(document)
+ JournalMessage.new(id: document['_id'],
+ timestamp: document['timestamp'].localtime,
+ topic: document['topic'],
+ payload: document['payload'].to_h)
+ end
+
+ cursor = query_cursor(query).skip(offset).limit(limit)
+
+ if block_given?
+ cursor.each { |document| block.call(to_message(document)) }
+ else
+ cursor.map { |document| to_message(document) }
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ query_cursor(query).count
+ end
+
+ def remove(query=nil)
+ query_cursor(query).delete_many
+ end
+
+ def drop
+ @collection.drop
+ end
+
+ def query_cursor(query)
+ unless query
+ return @collection.find()
+ end
+
+ query_and = []
+
+ # ID query OR condition
+ unless query.id.empty?
+ query_and << {
+ '$or' => query.id.map { |_id|
+ {'_id' => _id}
+ }
+ }
+ end
+
+ unless query.topic.empty?
+ query_and << {
+ '$or' => query.topic.map { |topic|
+ if topic.include?('*')
+ pattern = topic.gsub('.', '\.').gsub('*', '.*')
+ {'topic' => {'$regex' => pattern}}
+ else
+ {'topic' => topic}
+ end
+ }
+ }
+ end
+
+ if query.timestamp[:from] or query.timestamp[:to]
+ where = {}
+ if query.timestamp[:from]
+ where['$gte'] = query.timestamp[:from]
+ end
+ if query.timestamp[:to]
+ where['$lte'] = query.timestamp[:to]
+ end
+ query_and << {'timestamp' => where}
+ end
+
+ unless query.payload.empty?
+ query_and << {
+ '$or' => query.payload.map { |key, value|
+ key = 'payload.' + key
+ {key => value}
+ }
+ }
+ end
+
+ @collection.find({
+ '$and' => query_and
+ })
+ end
+ end
+ end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
new file mode 100644
index 00000000..7b333158
--- /dev/null
+++ b/lib/rbot/journal/postgres.rb
@@ -0,0 +1,280 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for postgresql
+
+require 'pg'
+require 'json'
+
+# wraps the postgres driver in a single thread
+class PGWrapper
+ def initialize(uri)
+ @uri = uri
+ @queue = Queue.new
+ run_thread
+ end
+
+ def run_thread
+ Thread.new do
+ @conn = PG.connect(@uri)
+ while message = @queue.pop
+ return_queue = message.shift
+ begin
+ result = @conn.send(*message)
+ return_queue << [:result, result]
+ rescue Exception => e
+ return_queue << [:exception, e]
+ end
+ end
+ @conn.finish
+ end
+ end
+
+ def run_in_thread(*args)
+ rq = Queue.new
+ @queue << [rq, *args]
+ type, result = rq.pop
+ if type == :exception
+ raise result
+ else
+ result
+ end
+ end
+
+ public
+
+ def destroy
+ @queue << nil
+ end
+
+ def exec(query)
+ run_in_thread(:exec, query)
+ end
+
+ def exec_params(query, params)
+ run_in_thread(:exec_params, query, params)
+ end
+
+ def escape_string(string)
+ @conn.escape_string(string)
+ end
+end
+
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
+declare
+ l_count integer;
+begin
+ select count(*)
+ into l_count
+ from pg_indexes
+ where schemaname = 'public'
+ and tablename = lower(table_name)
+ and indexname = lower(index_name);
+
+ if l_count = 0 then
+ execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+ end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
+
+module Irc
+class Bot
+module Journal
+
+ module Storage
+
+ class PostgresStorage < AbstractStorage
+ attr_reader :conn
+
+ def initialize(opts={})
+ @uri = opts[:uri] || 'postgresql://localhost/rbot'
+ @conn = PGWrapper.new(@uri)
+ @conn.exec('set client_min_messages = warning')
+ @conn.exec(CREATE_INDEX)
+ @version = @conn.exec('SHOW server_version;')[0]['server_version']
+
+ @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
+ log 'journal storage: postgresql connected to version: ' + @version
+
+ version = @version.split('.')[0,3].join.to_i
+ if version < 930
+ raise StorageError.new(
+ 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
+ end
+ @jsonb = (version >= 940)
+ log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+ log 'journal storage: postgres backend is using JSONB :)' if @jsonb
+
+ drop if opts[:drop]
+ create_table
+ create_index('topic_index', 'topic')
+ create_index('timestamp_index', 'timestamp')
+ end
+
+ def create_table
+ @conn.exec('
+ CREATE TABLE IF NOT EXISTS journal
+ (id UUID PRIMARY KEY,
+ topic TEXT NOT NULL,
+ timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+ payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ end
+
+ def create_index(index_name, column_name)
+ debug 'journal postges backend: create index %s for %s' % [
+ index_name, column_name]
+ @conn.exec_params('SELECT create_index($1, $2, $3)', [
+ 'journal', index_name, column_name])
+ end
+
+ def create_payload_index(key)
+ index_name = 'idx_payload_' + key.gsub('.', '_')
+ column = sql_payload_selector(key)
+ create_index(index_name, column)
+ end
+
+ def ensure_index(key)
+ create_payload_index(key)
+ end
+
+ def insert(m)
+ @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+ [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+ end
+
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(row)
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ else
+ sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ if block_given?
+ res.each { |row| block.call(to_message(row)) }
+ else
+ res.map { |row| to_message(row) }
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ else
+ sql = 'SELECT COUNT(*) FROM journal'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ res[0]['count'].to_i
+ end
+
+ def remove(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'DELETE FROM journal WHERE ' + sql
+ else
+ sql = 'DELETE FROM journal;'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ end
+
+ def drop
+ @conn.exec('DROP TABLE journal;') rescue nil
+ end
+
+ def sql_payload_selector(key)
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ selector
+ end
+
+ def query_to_sql(query)
+ params = []
+ placeholder = Proc.new do |value|
+ params << value
+ '$%d' % [params.length]
+ end
+ sql = {op: 'AND', list: []}
+
+ # ID query OR condition
+ unless query.id.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.id.map { |id|
+ 'id = ' + placeholder.call(id)
+ }
+ }
+ end
+
+ # Topic query OR condition
+ unless query.topic.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.topic.map { |topic|
+ 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+ }
+ }
+ end
+
+ # Timestamp range query AND condition
+ if query.timestamp[:from] or query.timestamp[:to]
+ list = []
+ if query.timestamp[:from]
+ list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+ end
+ if query.timestamp[:to]
+ list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+ end
+ sql[:list] << {
+ op: 'AND',
+ list: list
+ }
+ end
+
+ # Payload query
+ unless query.payload.empty?
+ list = []
+ query.payload.each_pair do |key, value|
+ selector = sql_payload_selector(key)
+ list << selector + ' = ' + placeholder.call(value)
+ end
+ sql[:list] << {
+ op: 'OR',
+ list: list
+ }
+ end
+
+ sql = sql[:list].map { |stmt|
+ '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+ }.join(' %s ' % [sql[:op]])
+
+ [sql, params]
+ end
+ end
+ end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/messagemapper.rb b/lib/rbot/messagemapper.rb
index 3e877626..3966bc17 100644
--- a/lib/rbot/messagemapper.rb
+++ b/lib/rbot/messagemapper.rb
@@ -199,12 +199,12 @@ class Bot
#
# Further examples:
#
- # # match 'karmastats' and call my stats() method
- # plugin.map 'karmastats', :action => 'stats'
- # # match 'karma' with an optional 'key' and call my karma() method
- # plugin.map 'karma :key', :defaults => {:key => false}
- # # match 'karma for something' and call my karma() method
- # plugin.map 'karma for :key'
+ # # match 'pointstats' and call my stats() method
+ # plugin.map 'pointstats', :action => 'stats'
+ # # match 'points' with an optional 'key' and call my points() method
+ # plugin.map 'points :key', :defaults => {:key => false}
+ # # match 'points for something' and call my points() method
+ # plugin.map 'points for :key'
#
# # two matches, one for public messages in a channel, one for
# # private messages which therefore require a channel argument
diff --git a/lib/rbot/plugins.rb b/lib/rbot/plugins.rb
index e40cfcc4..8621fe45 100644
--- a/lib/rbot/plugins.rb
+++ b/lib/rbot/plugins.rb
@@ -38,36 +38,36 @@ module Plugins
Examples:
- plugin.map 'karmastats', :action => 'karma_stats'
+ plugin.map 'pointstats', :action => 'point_stats'
# while in the plugin...
- def karma_stats(m, params)
+ def point_stats(m, params)
m.reply "..."
end
# the default action is the first component
- plugin.map 'karma'
+ plugin.map 'points'
# attributes can be pulled out of the match string
- plugin.map 'karma for :key'
- plugin.map 'karma :key'
+ plugin.map 'points for :key'
+ plugin.map 'points :key'
# while in the plugin...
- def karma(m, params)
+ def points(m, params)
item = params[:key]
- m.reply 'karma for #{item}'
+ m.reply 'points for #{item}'
end
# you can setup defaults, to make parameters optional
- plugin.map 'karma :key', :defaults => {:key => 'defaultvalue'}
+ plugin.map 'points :key', :defaults => {:key => 'defaultvalue'}
# the default auth check is also against the first component
# but that can be changed
- plugin.map 'karmastats', :auth => 'karma'
+ plugin.map 'pointstats', :auth => 'points'
# maps can be restricted to public or private message:
- plugin.map 'karmastats', :private => false
- plugin.map 'karmastats', :public => false
+ plugin.map 'pointstats', :private => false
+ plugin.map 'pointstats', :public => false
See MessageMapper#map for more information on the template format and the
allowed options.
@@ -410,6 +410,9 @@ module Plugins
attr_reader :botmodules
attr_reader :maps
+ attr_reader :core_module_dirs
+ attr_reader :plugin_dirs
+
# This is the list of patterns commonly delegated to plugins.
# A fast delegation lookup is enabled for them.
DEFAULT_DELEGATE_PATTERNS = %r{^(?:
diff --git a/lib/rbot/registry.rb b/lib/rbot/registry.rb
index 5e905ebb..04892d73 100644
--- a/lib/rbot/registry.rb
+++ b/lib/rbot/registry.rb
@@ -57,6 +57,15 @@ class Registry
@libpath = File.join(File.dirname(__FILE__), 'registry')
@format = format
load File.join(@libpath, @format+'.rb') if format
+ # The get_impl method will return all implementations of the
+ # abstract accessor interface, since we only ever load one
+ # (the configured one) accessor implementation, we can just assume
+ # it to be the correct accessor to use.
+ accessors = AbstractAccessor.get_impl
+ if accessors.length > 1
+ warning 'multiple accessor implementations loaded!'
+ end
+ @accessor_class = accessors.first
end
# Returns a list of supported registry database formats.
@@ -68,12 +77,7 @@ class Registry
# Creates a new Accessor object for the specified database filename.
def create(path, filename)
- # The get_impl method will return a list of all the classes that
- # implement the accessor interface, since we only ever load one
- # (the configured one) accessor implementation, we can just assume
- # it to be the correct accessor to use.
- cls = AbstractAccessor.get_impl.first
- db = cls.new(File.join(path, 'registry_' + @format, filename.downcase))
+ db = @accessor_class.new(File.join(path, 'registry_' + @format, filename.downcase))
db.optimize
db
end
@@ -330,7 +334,7 @@ class Registry
# Returns all classes from the namespace that implement this interface
def self.get_impl
- ObjectSpace.each_object(Class).select { |klass| klass < self }
+ ObjectSpace.each_object(Class).select { |klass| klass.ancestors[1] == self }
end
end
diff --git a/po/en/rbot-karma.po b/po/en/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/en/rbot-karma.po
+++ b/po/en/rbot-points.po
diff --git a/po/fi/rbot-karma.po b/po/fi/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/fi/rbot-karma.po
+++ b/po/fi/rbot-points.po
diff --git a/po/fr/rbot-karma.po b/po/fr/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/fr/rbot-karma.po
+++ b/po/fr/rbot-points.po
diff --git a/po/it/rbot-karma.po b/po/it/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/it/rbot-karma.po
+++ b/po/it/rbot-points.po
diff --git a/po/ja/rbot-karma.po b/po/ja/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/ja/rbot-karma.po
+++ b/po/ja/rbot-points.po
diff --git a/po/rbot-karma.pot b/po/rbot-points.pot
index e69de29b..e69de29b 100644
--- a/po/rbot-karma.pot
+++ b/po/rbot-points.pot
diff --git a/po/zh_CN/rbot-karma.po b/po/zh_CN/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/zh_CN/rbot-karma.po
+++ b/po/zh_CN/rbot-points.po
diff --git a/po/zh_TW/rbot-karma.po b/po/zh_TW/rbot-points.po
index e69de29b..e69de29b 100644
--- a/po/zh_TW/rbot-karma.po
+++ b/po/zh_TW/rbot-points.po
diff --git a/rbot.gemspec b/rbot.gemspec
index 29d88c1f..d023609d 100644
--- a/rbot.gemspec
+++ b/rbot.gemspec
@@ -1,3 +1,5 @@
+require 'rake'
+
Gem::Specification.new do |s|
s.name = 'rbot'
s.version = '0.9.15'
@@ -17,13 +19,13 @@ Gem::Specification.new do |s|
'COPYING',
'COPYING.rbot',
'GPLv2',
- 'README.rdoc',
+ 'README.md',
'REQUIREMENTS',
'TODO',
'ChangeLog',
'INSTALL',
'Usage_en.txt',
- 'man/rbot.1',
+ 'man/rbot.xml',
'setup.rb',
'launch_here.rb',
'po/*.pot',
diff --git a/test/test_journal.rb b/test/test_journal.rb
new file mode 100644
index 00000000..f1653c16
--- /dev/null
+++ b/test/test_journal.rb
@@ -0,0 +1,421 @@
+$:.unshift File.join(File.dirname(__FILE__), '../lib')
+
+require 'test/unit'
+require 'rbot/ircbot'
+require 'rbot/journal'
+require 'rbot/journal/postgres.rb'
+require 'rbot/journal/mongo.rb'
+
+require 'benchmark'
+
+DAY=60*60*24
+
+class JournalMessageTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_get
+ m = JournalMessage.create('foo', {'bar': 42, 'baz': nil, 'qux': {'quxx': 23}})
+ assert_equal(42, m.get('bar'))
+ assert_raise ArgumentError do
+ m.get('nope')
+ end
+ assert_nil(m.get('nope', nil))
+ assert_nil(m.get('baz'))
+ assert_equal(23, m['qux.quxx'])
+ assert_equal(nil, m['qux.nope'])
+ assert_raise(ArgumentError) { m.get('qux.nope') }
+ end
+
+end
+
+class QueryTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_define
+
+ q = Query.define do
+ id 'foo'
+ id 'bar', 'baz'
+ topic 'log.irc.*'
+ topic 'log.core', 'baz'
+ timestamp from: Time.now, to: Time.now + 60 * 10
+ payload 'action': :privmsg, 'alice': 'bob'
+ payload 'channel': '#rbot'
+ payload 'foo.bar': 'baz'
+ end
+ assert_equal(['foo', 'bar', 'baz'], q.id)
+ assert_equal(['log.irc.*', 'log.core', 'baz'], q.topic)
+ assert_equal([:from, :to], q.timestamp.keys)
+ assert_equal(Time, q.timestamp[:to].class)
+ assert_equal(Time, q.timestamp[:from].class)
+ assert_equal({
+ 'action': :privmsg, 'alice': 'bob',
+ 'channel': '#rbot',
+ 'foo.bar': 'baz'
+ }, q.payload)
+
+ end
+
+ def test_topic_matches
+ q = Query.define do
+ topic 'foo'
+ end
+ assert_true(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_false(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo.bar'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo.*'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+ assert_true(q.topic_matches?('foo.baz'))
+
+ q = Query.define do
+ topic '*.bar'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_false(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('foo.bar'))
+ assert_true(q.topic_matches?('bar.bar'))
+ assert_false(q.topic_matches?('foo.foo'))
+
+ q = Query.define do
+ topic '*.*'
+ end
+ assert_false(q.topic_matches?('foo'))
+ assert_true(q.topic_matches?('foo.bar'))
+
+ q = Query.define do
+ topic 'foo'
+ topic 'bar'
+ topic 'baz.alice.bob.*.foo'
+ end
+ assert_true(q.topic_matches?('foo'))
+ assert_true(q.topic_matches?('bar'))
+ assert_true(q.topic_matches?('baz.alice.bob.asdf.foo'))
+ assert_false(q.topic_matches?('baz.alice.bob..foo'))
+
+ end
+ def test_matches
+ q = Query.define do
+ #id 'foo', 'bar'
+ topic 'log.irc.*', 'log.core'
+ timestamp from: Time.now - DAY, to: Time.now + DAY
+ payload 'action': 'privmsg', 'foo.bar': 'baz'
+ end
+ assert_true(q.matches? JournalMessage.create('log.irc.raw', {'action' => 'privmsg'}))
+ assert_false(q.matches? JournalMessage.create('baz', {}))
+ assert_true(q.matches? JournalMessage.create('log.core', {foo: {bar: 'baz'}}))
+
+ # tests timestamp from/to:
+ assert_true(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now,
+ payload: {action: 'privmsg'}))
+ assert_false(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now - DAY*3,
+ payload: {action: 'privmsg'}))
+ assert_false(q.matches? JournalMessage.new(
+ id: 'foo',
+ topic: 'log.core',
+ timestamp: Time.now + DAY*3,
+ payload: {action: 'privmsg'}))
+ end
+
+end
+
+class JournalBrokerTest < Test::Unit::TestCase
+
+ include Irc::Bot::Journal
+
+ def test_publish
+ received = []
+ journal = JournalBroker.new(consumer: Proc.new { |message|
+ received << message
+ })
+
+ # publish some messages:
+ journal.publish 'log.irc',
+ source: 'alice', message: '<3 pg'
+ journal.publish 'log.irc',
+ source: 'bob', message: 'mysql > pg'
+ journal.publish 'log.irc',
+ source: 'alice', target: 'bob', action: :kick
+
+ # wait for messages to be consumed:
+ sleep 0.1
+ assert_equal(3, received.length)
+ end
+
+ def test_subscribe
+ received = []
+ journal = JournalBroker.new
+
+ # subscribe to messages for topic foo:
+ sub = journal.subscribe('foo') do |message|
+ received << message
+ end
+
+ # publish some messages:
+ journal.publish 'foo', {}
+ journal.publish 'bar', {}
+ journal.publish 'foo', {}
+
+ # wait for messages to be consumed:
+ sleep 0.1
+ assert_equal(2, received.length)
+
+ received.clear
+
+ journal.publish 'foo', {}
+ sleep 0.1
+ sub.cancel
+ journal.publish 'foo', {}
+ sleep 0.1
+ assert_equal(1, received.length)
+ end
+
+end
+
+module JournalStorageTestMixin
+
+ include Irc::Bot::Journal
+
+ def teardown
+ @storage.drop
+ end
+
+ def test_operations
+ # insertion
+ m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}})
+ @storage.insert(m)
+
+ # query by id
+ res = @storage.find(Query.define { id m.id })
+ assert_equal(1, res.length)
+ assert_equal(m, res.first)
+
+ # check timestamp was returned correctly:
+ assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
+ res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+
+ # check if payload was returned correctly:
+ assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload)
+
+ # query by topic
+ assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('log.*') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('*.*') }).first)
+
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length)
+ assert_equal(0, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length)
+
+ # query by payload
+ res = @storage.find(Query.define { payload('foo.bar' => 'baz') })
+ assert_equal(m, res.first)
+ res = @storage.find(Query.define { payload('foo.bar' => 'x') })
+ assert_true(res.empty?)
+
+ # without arguments: find and count
+ assert_equal(1, @storage.count)
+ assert_equal(m, @storage.find.first)
+ end
+
+ def test_find
+ # tests limit/offset and block parameters of find()
+ @storage.insert(JournalMessage.create('irclogs', {message: 'foo'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'bar'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'baz'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'qux'}))
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'})) do |m|
+ msgs << m
+ end
+ assert_equal(4, msgs.length)
+ assert_equal('foo', msgs.first['message'])
+ assert_equal('qux', msgs.last['message'])
+
+ end
+
+ def test_operations_multiple
+ # test operations on multiple messages
+ # insert a bunch:
+ @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+ @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+ timestamp: Time.now - DAY*100))
+ @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+ name: 'Italy'
+ }}))
+ @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+ name: 'Austria'
+ }}))
+
+ # query by topic
+ assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length)
+ # query by payload
+ assert_equal(1, @storage.find(Query.define {
+ payload('country.name' => 'Austria') }).length)
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length)
+
+ # count with query
+ assert_equal(2, @storage.count(Query.define { topic('complex') }))
+ assert_equal(6, @storage.count)
+ @storage.remove(Query.define { topic('archived.*') })
+ assert_equal(5, @storage.count)
+ @storage.remove
+ assert_equal(0, @storage.count)
+ end
+
+ def test_broker_interface
+ journal = JournalBroker.new(storage: @storage)
+
+ journal.publish 'irclogs', message: 'foo'
+ journal.publish 'irclogs', message: 'bar'
+ journal.publish 'irclogs', message: 'baz'
+ journal.publish 'irclogs', message: 'qux'
+
+ # wait for messages to be consumed:
+ sleep 0.1
+
+ msgs = []
+ journal.find({topic: 'irclogs'}, 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ journal.ensure_payload_index('foo.bar.baz')
+ end
+
+ NUM=100 # 1_000_000
+ def test_benchmark
+ puts
+
+ assert_equal(0, @storage.count)
+ # prepare messages to insert, we benchmark the storage backend not ruby
+ num = 0
+ messages = (0...NUM).map do
+ num += 1
+ JournalMessage.create(
+ 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+ end
+
+ # iter is the number of operations performed WITHIN block
+ def benchmark(label, iter, &block)
+ time = Benchmark.realtime do
+ yield
+ end
+ puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter]
+ end
+
+ benchmark(@storage.class.to_s+'~insert', messages.length) do
+ messages.each { |m|
+ @storage.insert(m)
+ }
+ end
+
+ benchmark(@storage.class.to_s+'~find_by_id', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { id m.id })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic.gsub('topic', '*') })
+ }
+ end
+ end
+
+end
+
+if ENV['PG_URI']
+class JournalStoragePostgresTest < Test::Unit::TestCase
+
+ include JournalStorageTestMixin
+
+ def setup
+ @storage = Storage::PostgresStorage.new(
+ uri: ENV['PG_URI'] || 'postgresql://localhost/rbot_journal',
+ drop: true)
+ end
+
+ def test_query_to_sql
+ q = Query.define do
+ id 'foo'
+ id 'bar', 'baz'
+ topic 'log.irc.*'
+ topic 'log.core', 'baz'
+ timestamp from: Time.now, to: Time.now + 60 * 10
+ payload 'action': :privmsg, 'alice': 'bob'
+ payload 'channel': '#rbot'
+ payload 'foo.bar': 'baz'
+ end
+ sql = @storage.query_to_sql(q)
+ assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+ q = Query.define do
+ id 'foo'
+ end
+ assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+ q = Query.define do
+ topic 'foo.*.bar'
+ end
+ assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+ assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
+ end
+
+end
+else
+ puts 'NOTE: Set PG_URI environment variable to test postgresql storage.'
+end
+
+if ENV['MONGO_URI']
+class JournalStorageMongoTest < Test::Unit::TestCase
+
+ include JournalStorageTestMixin
+
+ def setup
+ @storage = Storage::MongoStorage.new(
+ uri: ENV['MONGO_URI'] || 'mongodb://127.0.0.1:27017/rbot',
+ drop: true)
+ end
+end
+else
+ puts 'NOTE: Set MONGO_URI environment variable to test postgresql storage.'
+end
+