]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/commitdiff
journal: more postgres tests
authorMatthias Hecker <apoc@geekosphere.org>
Sun, 14 Jun 2015 17:31:55 +0000 (19:31 +0200)
committerMatthias Hecker <apoc@geekosphere.org>
Sun, 14 Jun 2015 17:31:55 +0000 (19:31 +0200)
lib/rbot/journal.rb
lib/rbot/journal/postgres.rb
test/test_journal.rb

index cc0578de53a7940e7e4c4fda57447fcaf5d70e2a..71b0c7ff7c0754322d4a5cf88573a91b2ed4a86e 100644 (file)
@@ -76,7 +76,7 @@ module Journal
     end
 
     def ==(other)
-      @id == other.id
+      (@id == other.id) rescue false
     end
 
     def self.create(topic, payload, opt={})
@@ -104,16 +104,32 @@ module Journal
       end
 
       # returns a array of message instances that match the query
-      def find(query, limit=10, offset=0)
+      def find(query=nil, limit=100, offset=0)
       end
 
       # returns the number of messages that match the query
-      def count(query)
+      def count(query=nil)
       end
 
-      # delete messages that match the query
-      def delete(query)
+      # remove messages that match the query
+      def remove(query=nil)
       end
+
+      # destroy the underlying table/collection
+      def drop
+      end
+
+      # Returns all classes from the namespace that implement this interface
+      def self.get_impl
+        ObjectSpace.each_object(Class).select { |klass| klass < self }
+      end
+    end
+
+    def create(name, uri)
+      log 'load journal storage adapter: ' + name
+      load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
+      cls = AbstractStorage.get_impl.first
+      cls.new(uri: uri)
     end
   end
 
@@ -270,7 +286,12 @@ module Journal
       # overrides the internal consumer with a block
       @consumer = opts[:consumer]
       # storage backend
-      @storage = opts[:storage]
+      if @bot
+        @storage = opts[:storage] || Storage.create(
+            @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
+      else
+        @storage = opts[:storage]
+      end
       @queue = Queue.new
       # consumer thread:
       @thread = Thread.new do
@@ -334,6 +355,17 @@ module Journal
       @subscriptions.delete subscription
     end
 
+    def find(query, limit=100, offset=0, &block)
+      if block_given?
+        begin
+          res = @storage.find(query, limit, offset)
+          block.call(res)
+        end until res.length > 0
+      else
+        @storage.find(query, limit, offset)
+      end
+    end
+
   end
 
 end # Journal
index 57c539a72ba40a4e612a45ddeabe46d19d4c3b07..65c67eb94c5ce2e7ca8384e989f366ed98820463 100644 (file)
@@ -53,9 +53,14 @@ module Journal
           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
       end
 
-      def find(query, limit=100, offset=0)
-        sql, params = query_to_sql(query)
-        sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+      def find(query=nil, limit=100, offset=0)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+        else
+          sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+          params = []
+        end
         res = @conn.exec_params(sql, params)
         res.map do |row|
           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
@@ -65,13 +70,29 @@ module Journal
       end
 
       # returns the number of messages that match the query
-      def count(query)
-        sql, params = query_to_sql(query)
-        sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+      def count(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+        else
+          sql = 'SELECT COUNT(*) FROM journal'
+          params = []
+        end
         res = @conn.exec_params(sql, params)
         res[0]['count'].to_i
       end
 
+      def remove(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'DELETE FROM journal WHERE ' + sql
+        else
+          sql = 'DELETE FROM journal;'
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+      end
+
       def drop
         @conn.exec('DROP TABLE journal;') rescue nil
       end
index 2a522aa75b9101d4c3dceeeb287a28433ea783f7..3f1ce7667a5020538793277f612e52f68c32126b 100644 (file)
@@ -224,41 +224,68 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
     assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
   end
 
-  def test_insert
-    # the test message to persist
+  def test_operations
+    # insertion
     m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
-    # insert the test message:
     @storage.insert(m)
 
-    # find the test message by query:
-    q = Query.define do
-      topic 'log.core'
-    end
-    res = @storage.find(q)
-    _m = res.first
-    assert_equal(m, _m) # this only checks id
+    # query by id
+    res = @storage.find(Query.define { id m.id })
+    assert_equal(1, res.length)
+    assert_equal(m, res.first)
+
+    # check timestamp was returned correctly:
     assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
-                 _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
-    assert_equal('log.core', _m.topic)
-    assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload)
-    assert_equal(1, @storage.count(q))
+                 res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+
+    # check if payload was returned correctly:
+    assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload)
+
+    # query by topic
+    assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
+    assert_equal(m, @storage.find(Query.define { topic('log.*') }).first)
+    assert_equal(m, @storage.find(Query.define { topic('*.*') }).first)
+
+    # query by timestamp range
+    assert_equal(1, @storage.find(Query.define {
+      timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length)
+    assert_equal(0, @storage.find(Query.define {
+      timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length)
+
+    # query by payload
+    res = @storage.find(Query.define { payload('foo.bar' => 'baz') })
+    assert_equal(m, res.first)
+    res = @storage.find(Query.define { payload('foo.bar' => 'x') })
+    assert_true(res.empty?)
+
+    # without arguments: find and count
+    assert_equal(1, @storage.count)
+    assert_equal(m, @storage.find.first)
   end
 
-  def test_query_range
-    timestamp = Time.now - DAY*7
-    m = JournalMessage.create('log.core', {foo: {bar: 'baz'}},
-                              timestamp: timestamp)
-    assert_equal(timestamp, m.timestamp)
+  def test_operations_multiple
+    # test operations on multiple messages
+    # insert a bunch:
+    @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+    @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+    @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+    @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+                                          timestamp: Time.now - DAY*100))
+    @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+      name: 'Italy'
+    }}))
+    @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+      name: 'Austria'
+    }}))
 
-    @storage.insert(m)
-    @storage.insert(JournalMessage.create('a.foo', {}))
-    @storage.insert(JournalMessage.create('b.bar', {}))
-    @storage.insert(JournalMessage.create('b.baz', {}))
 
-    r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) })
+  end
+
+  def test_journal
+    received = []
+    # this journal persists messages in the test storage:
+    journal = JournalBroker.new(storage: @storage)
 
-    assert_equal(1, r.length)
-    assert_equal(m, r.first)
 
   end