drop if opts[:drop]
@collection.indexes.create_one({topic: 1})
+ @collection.indexes.create_one({timestamp: 1})
end
- def ensure_index(key)
+ def ensure_payload_index(key)
@collection.indexes.create_one({'payload.'+key => 1})
end
})
end
- def find(query=nil, limit=100, offset=0)
- query_cursor(query).skip(offset).limit(limit).map do |document|
- JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime,
- topic: document['topic'], payload: document['payload'].to_h)
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(document)
+ JournalMessage.new(id: document['_id'],
+ timestamp: document['timestamp'].localtime,
+ topic: document['topic'],
+ payload: document['payload'].to_h)
+ end
+
+ cursor = query_cursor(query).skip(offset).limit(limit)
+
+ if block_given?
+ cursor.each { |document| block.call(to_message(document)) }
+ else
+ cursor.map { |document| to_message(document) }
end
end