5 # :title: journal backend for mongoDB
16 class MongoStorage < AbstractStorage
19 def initialize(opts={})
20 Mongo::Logger.logger.level = Logger::WARN
21 @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
22 @client = Mongo::Client.new(@uri)
23 @collection = @client['journal']
24 log 'journal storage: mongodb connected to ' + @uri
27 @collection.indexes.create_one({topic: 1})
31 @collection.indexes.create_one({'payload.'+key => 1})
35 @collection.insert_one({
38 'timestamp' => m.timestamp,
39 'payload' => m.payload
43 def find(query=nil, limit=100, offset=0)
44 query_cursor(query).skip(offset).limit(limit).map do |document|
45 JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime,
46 topic: document['topic'], payload: document['payload'].to_h)
50 # returns the number of messages that match the query
52 query_cursor(query).count
56 query_cursor(query).delete_many
63 def query_cursor(query)
65 return @collection.find()
70 # ID query OR condition
71 unless query.id.empty?
73 '$or' => query.id.map { |_id|
79 unless query.topic.empty?
81 '$or' => query.topic.map { |topic|
82 if topic.include?('*')
83 pattern = topic.gsub('.', '\.').gsub('*', '.*')
84 {'topic' => {'$regex' => pattern}}
92 if query.timestamp[:from] or query.timestamp[:to]
94 if query.timestamp[:from]
95 where['$gte'] = query.timestamp[:from]
97 if query.timestamp[:to]
98 where['$lte'] = query.timestamp[:to]
100 query_and << {'timestamp' => where}
103 unless query.payload.empty?
105 '$or' => query.payload.map { |key, value|
106 key = 'payload.' + key