]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
journal: add mongodb storage backend
authorMatthias Hecker <apoc@geekosphere.org>
Mon, 15 Jun 2015 21:56:20 +0000 (23:56 +0200)
committerMatthias Hecker <apoc@geekosphere.org>
Mon, 15 Jun 2015 21:56:20 +0000 (23:56 +0200)
lib/rbot/journal/mongo.rb [new file with mode: 0644]
lib/rbot/journal/postgres.rb
test/test_journal.rb

diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
new file mode 100644 (file)
index 0000000..24e9cfc
--- /dev/null
@@ -0,0 +1,120 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+#++
+#
+# :title: journal backend for mongoDB
+
+require 'mongo'
+require 'json'
+
+module Irc
+class Bot
+module Journal
+
+  module Storage
+
+    class MongoStorage < AbstractStorage
+      attr_reader :client
+
+      def initialize(opts={})
+        Mongo::Logger.logger.level = Logger::WARN
+        @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
+        @client = Mongo::Client.new(@uri)
+        @collection = @client['journal']
+        log 'journal storage: mongodb connected to ' + @uri
+        
+        drop if opts[:drop]
+        @collection.indexes.create_one({topic: 1})
+      end
+
+      def ensure_index(key)
+        @collection.indexes.create_one({'payload.'+key => 1})
+      end
+
+      def insert(m)
+        @collection.insert_one({
+          '_id' => m.id,
+          'topic' => m.topic,
+          'timestamp' => m.timestamp,
+          'payload' => m.payload
+        })
+      end
+
+      def find(query=nil, limit=100, offset=0)
+        query_cursor(query).skip(offset).limit(limit).map do |document|
+          JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime,
+            topic: document['topic'], payload: document['payload'].to_h)
+        end
+      end
+
+      # returns the number of messages that match the query
+      def count(query=nil)
+        query_cursor(query).count
+      end
+
+      def remove(query=nil)
+        query_cursor(query).delete_many
+      end
+
+      def drop
+        @collection.drop
+      end
+
+      def query_cursor(query)
+        unless query
+          return @collection.find()
+        end
+
+        query_and = []
+
+        # ID query OR condition
+        unless query.id.empty?
+          query_and << {
+            '$or' => query.id.map { |_id| 
+              {'_id' => _id}
+            }
+          }
+        end
+
+        unless query.topic.empty?
+          query_and << {
+            '$or' => query.topic.map { |topic|
+              if topic.include?('*')
+                pattern = topic.gsub('.', '\.').gsub('*', '.*')
+                {'topic' => {'$regex' => pattern}}
+              else
+                {'topic' => topic}
+              end
+            }
+          }
+        end
+
+        if query.timestamp[:from] or query.timestamp[:to]
+          where = {}
+          if query.timestamp[:from]
+            where['$gte'] = query.timestamp[:from]
+          end
+          if query.timestamp[:to]
+            where['$lte'] = query.timestamp[:to]
+          end
+          query_and << {'timestamp' => where}
+        end
+
+        unless query.payload.empty?
+          query_and << {
+            '$or' => query.payload.map { |key, value|
+              key = 'payload.' + key
+              {key => value}
+            }
+          }
+        end
+
+        @collection.find({
+          '$and' => query_and
+        })
+      end
+    end
+  end
+end # Journal
+end # Bot
+end # Irc
index 52f5fb368f0b8c848c01085eddcdef3aaa4f267c..e63aefee983bf8907d6dd5c678919f8cab202d3b 100644 (file)
@@ -7,6 +7,28 @@
 require 'pg'
 require 'json'
 
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
+declare 
+   l_count integer;
+begin
+  select count(*)
+     into l_count
+  from pg_indexes
+  where schemaname = 'public'
+    and tablename = lower(table_name)
+    and indexname = lower(index_name);
+
+  if l_count = 0 then 
+     execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+  end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
+
 module Irc
 class Bot
 module Journal
@@ -17,9 +39,10 @@ module Journal
       attr_reader :conn
 
       def initialize(opts={})
-        @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
+        @uri = opts[:uri] || 'postgresql://localhost/rbot'
         @conn = PG.connect(@uri)
         @conn.exec('set client_min_messages = warning')
+        @conn.exec(CREATE_INDEX)
         @version = @conn.exec('SHOW server_version;')[0]['server_version']
 
         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
@@ -35,6 +58,7 @@ module Journal
 
         drop if opts[:drop]
         create_table
+        create_index('topic_index', 'topic')
       end
 
       def create_table
@@ -46,6 +70,23 @@ module Journal
              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
       end
 
+      def create_index(index_name, column_name)
+        debug 'journal postges backend: create index %s for %s' % [
+          index_name, column_name]
+        @conn.exec_params('SELECT create_index($1, $2, $3)', [
+          'journal', index_name, column_name])
+      end
+
+      def create_payload_index(key)
+        index_name = 'idx_payload_' + key.gsub('.', '_')
+        column = sql_payload_selector(key)
+        create_index(index_name, column)
+      end
+
+      def ensure_index(key)
+        create_payload_index(key)
+      end
+
       def insert(m)
         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
@@ -95,6 +136,19 @@ module Journal
         @conn.exec('DROP TABLE journal;') rescue nil
       end
 
+      def sql_payload_selector(key)
+        selector = 'payload'
+        k = key.to_s.split('.')
+        k.each_index { |i|
+          if i >= k.length-1
+            selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+          else
+            selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+          end
+        }
+        selector
+      end
+
       def query_to_sql(query)
         params = []
         placeholder = Proc.new do |value|
@@ -142,15 +196,7 @@ module Journal
         unless query.payload.empty?
           list = []
           query.payload.each_pair do |key, value|
-            selector = 'payload'
-            k = key.to_s.split('.')
-            k.each_index { |i|
-              if i >= k.length-1
-                selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
-              else
-                selector += '->\'%s\'' % [@conn.escape_string(k[i])]
-              end
-            }
+            selector = sql_payload_selector(key)
             list << selector + ' = ' + placeholder.call(value)
           end
           sql[:list] << {
index 9e8f06541dfb0ca333e191bb5f761820159601b0..b9f5c61219009f28d4ee4643f20263d99fa7b90a 100644 (file)
@@ -4,6 +4,9 @@ require 'test/unit'
 require 'rbot/ircbot'
 require 'rbot/journal'
 require 'rbot/journal/postgres.rb'
+require 'rbot/journal/mongo.rb'
+
+require 'benchmark'
 
 DAY=60*60*24
 
@@ -186,47 +189,17 @@ class JournalBrokerTest < Test::Unit::TestCase
 
 end
 
-class JournalStoragePostgresTest < Test::Unit::TestCase
+module JournalStorageTestMixin
 
   include Irc::Bot::Journal
 
-  def setup
-    @storage = Storage::PostgresStorage.new(
-      uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
-      drop: true)
-  end
-
   def teardown
     @storage.drop
   end
 
-  def test_query_to_sql
-    q = Query.define do
-      id 'foo'
-      id 'bar', 'baz'
-      topic 'log.irc.*'
-      topic 'log.core', 'baz'
-      timestamp from: Time.now, to: Time.now + 60 * 10
-      payload 'action': :privmsg, 'alice': 'bob'
-      payload 'channel': '#rbot'
-      payload 'foo.bar': 'baz'
-    end
-    sql = @storage.query_to_sql(q)
-    assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
-    q = Query.define do
-      id 'foo'
-    end
-    assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
-    q = Query.define do
-      topic 'foo.*.bar'
-    end
-    assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
-    assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
-  end
-
   def test_operations
     # insertion
-    m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
+    m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}})
     @storage.insert(m)
 
     # query by id
@@ -239,7 +212,7 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
                  res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
 
     # check if payload was returned correctly:
-    assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload)
+    assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload)
 
     # query by topic
     assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
@@ -304,5 +277,96 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
     assert_equal(1, journal.count)
   end
 
+  NUM=150_000
+  def test_benchmark
+    puts
+
+    assert_equal(0, @storage.count)
+    # prepare messages to insert, we benchmark the storage backend not ruby
+    num = 0
+    messages = (0...NUM).map do
+      num += 1
+      JournalMessage.create(
+            'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+    end
+
+    # iter is the number of operations performed WITHIN block
+    def benchmark(label, iter, &block)
+      time = Benchmark.realtime do
+        yield
+      end
+      puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter]
+    end
+
+    benchmark(@storage.class.to_s+'~insert', messages.length) do
+      messages.each { |m|
+        @storage.insert(m)
+      }
+    end
+
+    benchmark(@storage.class.to_s+'~find_by_id', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { id m.id })
+      }
+    end
+    benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { topic m.topic })
+      }
+    end
+    benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do
+      messages.each { |m|
+        @storage.find(Query.define { topic m.topic.gsub('topic', '*') })
+      }
+    end
+  end
+
+end
+
+class JournalStoragePostgresTest < Test::Unit::TestCase
+
+  include JournalStorageTestMixin
+
+  def setup
+    @storage = Storage::PostgresStorage.new(
+      uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+      drop: true)
+  end
+
+  def test_query_to_sql
+    q = Query.define do
+      id 'foo'
+      id 'bar', 'baz'
+      topic 'log.irc.*'
+      topic 'log.core', 'baz'
+      timestamp from: Time.now, to: Time.now + 60 * 10
+      payload 'action': :privmsg, 'alice': 'bob'
+      payload 'channel': '#rbot'
+      payload 'foo.bar': 'baz'
+    end
+    sql = @storage.query_to_sql(q)
+    assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+    q = Query.define do
+      id 'foo'
+    end
+    assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+    q = Query.define do
+      topic 'foo.*.bar'
+    end
+    assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+    assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
+  end
+
+end
+
+class JournalStorageMongoTest < Test::Unit::TestCase
+
+  include JournalStorageTestMixin
+
+  def setup
+    @storage = Storage::MongoStorage.new(
+      drop: true)
+  end
+
 end