+ def teardown
+ @storage.drop
+ end
+
+ def test_operations
+ # insertion
+ m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}})
+ @storage.insert(m)
+
+ # query by id
+ res = @storage.find(Query.define { id m.id })
+ assert_equal(1, res.length)
+ assert_equal(m, res.first)
+
+ # check timestamp was returned correctly:
+ assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
+ res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+
+ # check if payload was returned correctly:
+ assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload)
+
+ # query by topic
+ assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('log.*') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('*.*') }).first)
+
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length)
+ assert_equal(0, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length)
+
+ # query by payload
+ res = @storage.find(Query.define { payload('foo.bar' => 'baz') })
+ assert_equal(m, res.first)
+ res = @storage.find(Query.define { payload('foo.bar' => 'x') })
+ assert_true(res.empty?)
+
+ # without arguments: find and count
+ assert_equal(1, @storage.count)
+ assert_equal(m, @storage.find.first)
+ end
+
+ def test_find
+ # tests limit/offset and block parameters of find()
+ @storage.insert(JournalMessage.create('irclogs', {message: 'foo'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'bar'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'baz'}))
+ @storage.insert(JournalMessage.create('irclogs', {message: 'qux'}))
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'}), 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ msgs = []
+ @storage.find(Query.define({topic: 'irclogs'})) do |m|
+ msgs << m
+ end
+ assert_equal(4, msgs.length)
+ assert_equal('foo', msgs.first['message'])
+ assert_equal('qux', msgs.last['message'])
+
+ end
+
+ def test_operations_multiple
+ # test operations on multiple messages
+ # insert a bunch:
+ @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+ @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+ timestamp: Time.now - DAY*100))
+ @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+ name: 'Italy'
+ }}))
+ @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+ name: 'Austria'
+ }}))
+
+ # query by topic
+ assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length)
+ # query by payload
+ assert_equal(1, @storage.find(Query.define {
+ payload('country.name' => 'Austria') }).length)
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length)
+
+ # count with query
+ assert_equal(2, @storage.count(Query.define { topic('complex') }))
+ assert_equal(6, @storage.count)
+ @storage.remove(Query.define { topic('archived.*') })
+ assert_equal(5, @storage.count)
+ @storage.remove
+ assert_equal(0, @storage.count)
+ end
+
+ def test_broker_interface
+ journal = JournalBroker.new(storage: @storage)
+
+ journal.publish 'irclogs', message: 'foo'
+ journal.publish 'irclogs', message: 'bar'
+ journal.publish 'irclogs', message: 'baz'
+ journal.publish 'irclogs', message: 'qux'
+
+ # wait for messages to be consumed:
+ sleep 0.1
+
+ msgs = []
+ journal.find({topic: 'irclogs'}, 2, 1) do |m|
+ msgs << m
+ end
+ assert_equal(2, msgs.length)
+ assert_equal('bar', msgs.first['message'])
+ assert_equal('baz', msgs.last['message'])
+
+ journal.ensure_payload_index('foo.bar.baz')
+ end
+
+ NUM=100 # 1_000_000
+ def test_benchmark
+ puts
+
+ assert_equal(0, @storage.count)
+ # prepare messages to insert, we benchmark the storage backend not ruby
+ num = 0
+ messages = (0...NUM).map do
+ num += 1
+ JournalMessage.create(
+ 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+ end
+
+ # iter is the number of operations performed WITHIN block
+ def benchmark(label, iter, &block)
+ time = Benchmark.realtime do
+ yield
+ end
+ puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter]
+ end
+
+ benchmark(@storage.class.to_s+'~insert', messages.length) do
+ messages.each { |m|
+ @storage.insert(m)
+ }
+ end
+
+ benchmark(@storage.class.to_s+'~find_by_id', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { id m.id })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic.gsub('topic', '*') })
+ }
+ end