]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
journal: finishing postgres adapter
authorMatthias Hecker <apoc@geekosphere.org>
Sun, 14 Jun 2015 16:27:32 +0000 (18:27 +0200)
committerMatthias Hecker <apoc@geekosphere.org>
Sun, 14 Jun 2015 16:27:32 +0000 (18:27 +0200)
lib/rbot/journal.rb
lib/rbot/journal/postgres.rb
test/test_journal.rb

index 27565d061530659bde0b5bff55eeaffbaac41e81..cc0578de53a7940e7e4c4fda57447fcaf5d70e2a 100644 (file)
@@ -75,10 +75,14 @@ module Journal
       end
     end
 
-    def self.create(topic, payload)
+    def ==(other)
+      @id == other.id
+    end
+
+    def self.create(topic, payload, opt={})
       JournalMessage.new(
-        id: SecureRandom.uuid,
-        timestamp: Time.now,
+        id: opt[:id] || SecureRandom.uuid,
+        timestamp: opt[:timestamp] || Time.now,
         topic: topic,
         payload: payload
       )
index 85514fb20f1a50c860242b81d7385eb71096b807..57c539a72ba40a4e612a45ddeabe46d19d4c3b07 100644 (file)
@@ -5,12 +5,20 @@
 # :title: journal backend for postgresql
 
 require 'pg'
+require 'json'
 
 module Irc
 class Bot
 module Journal
+
+  class Query
+  end
+
   module Storage
+
     class PostgresStorage < AbstractStorage
+      attr_reader :conn
+
       def initialize(opts={})
         @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
         @conn = PG.connect(@uri)
@@ -25,12 +33,118 @@ module Journal
             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
         end
         @jsonb = (version >= 940)
-        log 'journal storage: no jsonb support, consider upgrading postgres'
+        log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
 
+        drop if opts[:drop]
         create_table
       end
 
       def create_table
+        @conn.exec('
+          CREATE TABLE IF NOT EXISTS journal
+            (id UUID PRIMARY KEY,
+             topic TEXT NOT NULL,
+             timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+             payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+      end
+
+      def insert(m)
+        @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+          [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+      end
+
+      def find(query, limit=100, offset=0)
+        sql, params = query_to_sql(query)
+        sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+        res = @conn.exec_params(sql, params)
+        res.map do |row|
+          timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+          JournalMessage.new(id: row['id'], timestamp: timestamp,
+            topic: row['topic'], payload: JSON.parse(row['payload']))
+        end
+      end
+
+      # returns the number of messages that match the query
+      def count(query)
+        sql, params = query_to_sql(query)
+        sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+        res = @conn.exec_params(sql, params)
+        res[0]['count'].to_i
+      end
+
+      def drop
+        @conn.exec('DROP TABLE journal;') rescue nil
+      end
+
+      def query_to_sql(query)
+        params = []
+        placeholder = Proc.new do |value|
+          params << value
+          '$%d' % [params.length]
+        end
+        sql = {op: 'AND', list: []}
+
+        # ID query OR condition
+        unless query.id.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.id.map { |id| 
+              'id = ' + placeholder.call(id)
+            }
+          }
+        end
+
+        # Topic query OR condition
+        unless query.topic.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.topic.map { |topic| 
+              'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+            }
+          }
+        end
+
+        # Timestamp range query AND condition
+        if query.timestamp[:from] or query.timestamp[:to]
+          list = []
+          if query.timestamp[:from]
+            list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+          end
+          if query.timestamp[:to]
+            list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+          end
+          sql[:list] << {
+            op: 'AND',
+            list: list
+          }
+        end
+
+        # Payload query
+        unless query.payload.empty?
+          list = []
+          query.payload.each_pair do |key, value|
+            selector = 'payload'
+            k = key.to_s.split('.')
+            k.each_index { |i|
+              if i >= k.length-1
+                selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+              else
+                selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+              end
+            }
+            list << selector + ' = ' + placeholder.call(value)
+          end
+          sql[:list] << {
+            op: 'OR',
+            list: list
+          }
+        end
+
+        sql = sql[:list].map { |stmt|
+          '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+        }.join(' %s ' % [sql[:op]])
+
+        [sql, params]
       end
     end
   end
index 1e55f54b280fc7f36d8958102954b9322912694d..2a522aa75b9101d4c3dceeeb287a28433ea783f7 100644 (file)
@@ -5,6 +5,8 @@ require 'rbot/ircbot'
 require 'rbot/journal'
 require 'rbot/journal/postgres.rb'
 
+DAY=60*60*24
+
 class JournalMessageTest < Test::Unit::TestCase
 
   include Irc::Bot::Journal
@@ -100,8 +102,6 @@ class QueryTest < Test::Unit::TestCase
     assert_false(q.topic_matches?('baz.alice.bob..foo'))
 
   end
-
-  DAY=60*60*24
   def test_matches
     q = Query.define do
       #id 'foo', 'bar'
@@ -186,14 +186,80 @@ class JournalBrokerTest < Test::Unit::TestCase
 
 end
 
-class JournalStorageTest < Test::Unit::TestCase
+class JournalStoragePostgresTest < Test::Unit::TestCase
 
   include Irc::Bot::Journal
 
-  def test_storage
-    s = Storage::PostgresStorage.new(
-      uri: 'postgresql://apoc:seed42@localhost/rbot_journal')
-    assert_equal(true, true)
+  def setup
+    @storage = Storage::PostgresStorage.new(
+      uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+      drop: true)
+  end
+
+  def teardown
+    @storage.drop
+  end
+
+  def test_query_to_sql
+    q = Query.define do
+      id 'foo'
+      id 'bar', 'baz'
+      topic 'log.irc.*'
+      topic 'log.core', 'baz'
+      timestamp from: Time.now, to: Time.now + 60 * 10
+      payload 'action': :privmsg, 'alice': 'bob'
+      payload 'channel': '#rbot'
+      payload 'foo.bar': 'baz'
+    end
+    sql = @storage.query_to_sql(q)
+    assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+    q = Query.define do
+      id 'foo'
+    end
+    assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+    q = Query.define do
+      topic 'foo.*.bar'
+    end
+    assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+    assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
+  end
+
+  def test_insert
+    # the test message to persist
+    m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
+    # insert the test message:
+    @storage.insert(m)
+
+    # find the test message by query:
+    q = Query.define do
+      topic 'log.core'
+    end
+    res = @storage.find(q)
+    _m = res.first
+    assert_equal(m, _m) # this only checks id
+    assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
+                 _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+    assert_equal('log.core', _m.topic)
+    assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload)
+    assert_equal(1, @storage.count(q))
+  end
+
+  def test_query_range
+    timestamp = Time.now - DAY*7
+    m = JournalMessage.create('log.core', {foo: {bar: 'baz'}},
+                              timestamp: timestamp)
+    assert_equal(timestamp, m.timestamp)
+
+    @storage.insert(m)
+    @storage.insert(JournalMessage.create('a.foo', {}))
+    @storage.insert(JournalMessage.create('b.bar', {}))
+    @storage.insert(JournalMessage.create('b.baz', {}))
+
+    r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) })
+
+    assert_equal(1, r.length)
+    assert_equal(m, r.first)
+
   end
 
 end