+ # query by topic
+ assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length)
+ # query by payload
+ assert_equal(1, @storage.find(Query.define {
+ payload('country.name' => 'Austria') }).length)
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length)
+
+ # count with query
+ assert_equal(2, @storage.count(Query.define { topic('complex') }))
+ assert_equal(6, @storage.count)
+ @storage.remove(Query.define { topic('archived.*') })
+ assert_equal(5, @storage.count)
+ @storage.remove
+ assert_equal(0, @storage.count)
+ end
+
+ def test_broker_interface
+ journal = JournalBroker.new(storage: @storage)
+
+ journal.publish 'irclogs', message: 'foo'
+ journal.publish 'irclogs', message: 'bar'
+ journal.publish 'irclogs', message: 'baz'
+ journal.publish 'irclogs', message: 'qux'
+
+ # wait for messages to be consumed:
+ sleep 0.1
+
+ msgs = []
+ journal.find({topic: 'irclogs'}, 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ journal.ensure_payload_index('foo.bar.baz')
+ end
+
+ NUM=100 # 1_000_000
+ def test_benchmark
+ puts
+
+ assert_equal(0, @storage.count)
+ # prepare messages to insert, we benchmark the storage backend not ruby
+ num = 0
+ messages = (0...NUM).map do
+ num += 1
+ JournalMessage.create(
+ 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+ end