From 257ba78e8564964b78f839cce45350b824c2f410 Mon Sep 17 00:00:00 2001 From: Matthias Hecker Date: Mon, 15 Jun 2015 23:56:20 +0200 Subject: journal: add mongodb storage backend --- lib/rbot/journal/mongo.rb | 120 +++++++++++++++++++++++++++++++++++++++ lib/rbot/journal/postgres.rb | 66 ++++++++++++++++++---- test/test_journal.rb | 130 ++++++++++++++++++++++++++++++++----------- 3 files changed, 273 insertions(+), 43 deletions(-) create mode 100644 lib/rbot/journal/mongo.rb diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb new file mode 100644 index 00000000..24e9cfcc --- /dev/null +++ b/lib/rbot/journal/mongo.rb @@ -0,0 +1,120 @@ +# encoding: UTF-8 +#-- vim:sw=2:et +#++ +# +# :title: journal backend for mongoDB + +require 'mongo' +require 'json' + +module Irc +class Bot +module Journal + + module Storage + + class MongoStorage < AbstractStorage + attr_reader :client + + def initialize(opts={}) + Mongo::Logger.logger.level = Logger::WARN + @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot' + @client = Mongo::Client.new(@uri) + @collection = @client['journal'] + log 'journal storage: mongodb connected to ' + @uri + + drop if opts[:drop] + @collection.indexes.create_one({topic: 1}) + end + + def ensure_index(key) + @collection.indexes.create_one({'payload.'+key => 1}) + end + + def insert(m) + @collection.insert_one({ + '_id' => m.id, + 'topic' => m.topic, + 'timestamp' => m.timestamp, + 'payload' => m.payload + }) + end + + def find(query=nil, limit=100, offset=0) + query_cursor(query).skip(offset).limit(limit).map do |document| + JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime, + topic: document['topic'], payload: document['payload'].to_h) + end + end + + # returns the number of messages that match the query + def count(query=nil) + query_cursor(query).count + end + + def remove(query=nil) + query_cursor(query).delete_many + end + + def drop + @collection.drop + end + + def query_cursor(query) + unless query + return @collection.find() + end + + query_and = [] + + # ID query OR condition + unless query.id.empty? + query_and << { + '$or' => query.id.map { |_id| + {'_id' => _id} + } + } + end + + unless query.topic.empty? + query_and << { + '$or' => query.topic.map { |topic| + if topic.include?('*') + pattern = topic.gsub('.', '\.').gsub('*', '.*') + {'topic' => {'$regex' => pattern}} + else + {'topic' => topic} + end + } + } + end + + if query.timestamp[:from] or query.timestamp[:to] + where = {} + if query.timestamp[:from] + where['$gte'] = query.timestamp[:from] + end + if query.timestamp[:to] + where['$lte'] = query.timestamp[:to] + end + query_and << {'timestamp' => where} + end + + unless query.payload.empty? + query_and << { + '$or' => query.payload.map { |key, value| + key = 'payload.' + key + {key => value} + } + } + end + + @collection.find({ + '$and' => query_and + }) + end + end + end +end # Journal +end # Bot +end # Irc diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index 52f5fb36..e63aefee 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -7,6 +7,28 @@ require 'pg' require 'json' +# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres. +# define function to be able to create an index in case it doesnt exist: +# source: http://stackoverflow.com/a/26012880 +CREATE_INDEX = <<-EOT +CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ +declare + l_count integer; +begin + select count(*) + into l_count + from pg_indexes + where schemaname = 'public' + and tablename = lower(table_name) + and indexname = lower(index_name); + + if l_count = 0 then + execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')'; + end if; +end; +$$ LANGUAGE plpgsql; +EOT + module Irc class Bot module Journal @@ -17,9 +39,10 @@ module Journal attr_reader :conn def initialize(opts={}) - @uri = opts[:uri] || 'postgresql://localhost/rbot_journal' + @uri = opts[:uri] || 'postgresql://localhost/rbot' @conn = PG.connect(@uri) @conn.exec('set client_min_messages = warning') + @conn.exec(CREATE_INDEX) @version = @conn.exec('SHOW server_version;')[0]['server_version'] @version.gsub!(/^(\d+\.\d+)$/, '\1.0') @@ -35,6 +58,7 @@ module Journal drop if opts[:drop] create_table + create_index('topic_index', 'topic') end def create_table @@ -46,6 +70,23 @@ module Journal payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON']) end + def create_index(index_name, column_name) + debug 'journal postges backend: create index %s for %s' % [ + index_name, column_name] + @conn.exec_params('SELECT create_index($1, $2, $3)', [ + 'journal', index_name, column_name]) + end + + def create_payload_index(key) + index_name = 'idx_payload_' + key.gsub('.', '_') + column = sql_payload_selector(key) + create_index(index_name, column) + end + + def ensure_index(key) + create_payload_index(key) + end + def insert(m) @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);', [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) @@ -95,6 +136,19 @@ module Journal @conn.exec('DROP TABLE journal;') rescue nil end + def sql_payload_selector(key) + selector = 'payload' + k = key.to_s.split('.') + k.each_index { |i| + if i >= k.length-1 + selector += '->>\'%s\'' % [@conn.escape_string(k[i])] + else + selector += '->\'%s\'' % [@conn.escape_string(k[i])] + end + } + selector + end + def query_to_sql(query) params = [] placeholder = Proc.new do |value| @@ -142,15 +196,7 @@ module Journal unless query.payload.empty? list = [] query.payload.each_pair do |key, value| - selector = 'payload' - k = key.to_s.split('.') - k.each_index { |i| - if i >= k.length-1 - selector += '->>\'%s\'' % [@conn.escape_string(k[i])] - else - selector += '->\'%s\'' % [@conn.escape_string(k[i])] - end - } + selector = sql_payload_selector(key) list << selector + ' = ' + placeholder.call(value) end sql[:list] << { diff --git a/test/test_journal.rb b/test/test_journal.rb index 9e8f0654..b9f5c612 100644 --- a/test/test_journal.rb +++ b/test/test_journal.rb @@ -4,6 +4,9 @@ require 'test/unit' require 'rbot/ircbot' require 'rbot/journal' require 'rbot/journal/postgres.rb' +require 'rbot/journal/mongo.rb' + +require 'benchmark' DAY=60*60*24 @@ -186,47 +189,17 @@ class JournalBrokerTest < Test::Unit::TestCase end -class JournalStoragePostgresTest < Test::Unit::TestCase +module JournalStorageTestMixin include Irc::Bot::Journal - def setup - @storage = Storage::PostgresStorage.new( - uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal', - drop: true) - end - def teardown @storage.drop end - def test_query_to_sql - q = Query.define do - id 'foo' - id 'bar', 'baz' - topic 'log.irc.*' - topic 'log.core', 'baz' - timestamp from: Time.now, to: Time.now + 60 * 10 - payload 'action': :privmsg, 'alice': 'bob' - payload 'channel': '#rbot' - payload 'foo.bar': 'baz' - end - sql = @storage.query_to_sql(q) - assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0]) - q = Query.define do - id 'foo' - end - assert_equal('(id = $1)', @storage.query_to_sql(q)[0]) - q = Query.define do - topic 'foo.*.bar' - end - assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0]) - assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1]) - end - def test_operations # insertion - m = JournalMessage.create('log.core', {foo: {bar: 'baz'}}) + m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}}) @storage.insert(m) # query by id @@ -239,7 +212,7 @@ class JournalStoragePostgresTest < Test::Unit::TestCase res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z')) # check if payload was returned correctly: - assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload) + assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload) # query by topic assert_equal(m, @storage.find(Query.define { topic('log.core') }).first) @@ -304,5 +277,96 @@ class JournalStoragePostgresTest < Test::Unit::TestCase assert_equal(1, journal.count) end + NUM=150_000 + def test_benchmark + puts + + assert_equal(0, @storage.count) + # prepare messages to insert, we benchmark the storage backend not ruby + num = 0 + messages = (0...NUM).map do + num += 1 + JournalMessage.create( + 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}}) + end + + # iter is the number of operations performed WITHIN block + def benchmark(label, iter, &block) + time = Benchmark.realtime do + yield + end + puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter] + end + + benchmark(@storage.class.to_s+'~insert', messages.length) do + messages.each { |m| + @storage.insert(m) + } + end + + benchmark(@storage.class.to_s+'~find_by_id', messages.length) do + messages.each { |m| + @storage.find(Query.define { id m.id }) + } + end + benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do + messages.each { |m| + @storage.find(Query.define { topic m.topic }) + } + end + benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do + messages.each { |m| + @storage.find(Query.define { topic m.topic.gsub('topic', '*') }) + } + end + end + +end + +class JournalStoragePostgresTest < Test::Unit::TestCase + + include JournalStorageTestMixin + + def setup + @storage = Storage::PostgresStorage.new( + uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal', + drop: true) + end + + def test_query_to_sql + q = Query.define do + id 'foo' + id 'bar', 'baz' + topic 'log.irc.*' + topic 'log.core', 'baz' + timestamp from: Time.now, to: Time.now + 60 * 10 + payload 'action': :privmsg, 'alice': 'bob' + payload 'channel': '#rbot' + payload 'foo.bar': 'baz' + end + sql = @storage.query_to_sql(q) + assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0]) + q = Query.define do + id 'foo' + end + assert_equal('(id = $1)', @storage.query_to_sql(q)[0]) + q = Query.define do + topic 'foo.*.bar' + end + assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0]) + assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1]) + end + +end + +class JournalStorageMongoTest < Test::Unit::TestCase + + include JournalStorageTestMixin + + def setup + @storage = Storage::MongoStorage.new( + drop: true) + end + end -- cgit v1.2.3