+ assert_equal(4, msgs.length)
+ assert_equal('foo', msgs.first['message'])
+ assert_equal('qux', msgs.last['message'])
+
+ end
+
+ def test_operations_multiple
+ # test operations on multiple messages
+ # insert a bunch:
+ @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+ @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+ timestamp: Time.now - DAY*100))
+ @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+ name: 'Italy'
+ }}))
+ @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+ name: 'Austria'
+ }}))
+
+ # query by topic
+ assert_equal(3, @storage.find(Query.define { topic 'test.*' }).length)
+ # query by payload
+ assert_equal(1, @storage.find(Query.define {
+ payload('country.name' => 'Austria') }).length)
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now - DAY*150, to: Time.now - DAY*50) }).length)
+
+ # count with query
+ assert_equal(2, @storage.count(Query.define { topic('complex') }))
+ assert_equal(6, @storage.count)
+ @storage.remove(Query.define { topic('archived.*') })
+ assert_equal(5, @storage.count)
+ @storage.remove
+ assert_equal(0, @storage.count)
+ end
+
+ def test_broker_interface
+ journal = JournalBroker.new(storage: @storage)
+
+ journal.publish 'irclogs', message: 'foo'
+ journal.publish 'irclogs', message: 'bar'
+ journal.publish 'irclogs', message: 'baz'
+ journal.publish 'irclogs', message: 'qux'
+
+ # wait for messages to be consumed:
+ sleep 0.1
+
+ msgs = []
+ journal.find({topic: 'irclogs'}, 2, 1) do |m|
+ msgs << m