5 # :title: journal backend for mongoDB
16 class MongoStorage < AbstractStorage
19 def initialize(opts={})
20 Mongo::Logger.logger.level = Logger::WARN
21 @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
22 @client = Mongo::Client.new(@uri)
23 @collection = @client['journal']
24 log 'journal storage: mongodb connected to ' + @uri
27 @collection.indexes.create_one({topic: 1})
28 @collection.indexes.create_one({timestamp: 1})
31 def ensure_payload_index(key)
32 @collection.indexes.create_one({'payload.'+key => 1})
36 @collection.insert_one({
39 'timestamp' => m.timestamp,
40 'payload' => m.payload
44 def find(query=nil, limit=100, offset=0, &block)
45 def to_message(document)
46 JournalMessage.new(id: document['_id'],
47 timestamp: document['timestamp'].localtime,
48 topic: document['topic'],
49 payload: document['payload'].to_h)
52 cursor = query_cursor(query).skip(offset).limit(limit)
55 cursor.each { |document| block.call(to_message(document)) }
57 cursor.map { |document| to_message(document) }
61 # returns the number of messages that match the query
63 query_cursor(query).count
67 query_cursor(query).delete_many
74 def query_cursor(query)
76 return @collection.find()
81 # ID query OR condition
82 unless query.id.empty?
84 '$or' => query.id.map { |_id|
90 unless query.topic.empty?
92 '$or' => query.topic.map { |topic|
93 if topic.include?('*')
94 pattern = topic.gsub('.', '\.').gsub('*', '.*')
95 {'topic' => {'$regex' => pattern}}
103 if query.timestamp[:from] or query.timestamp[:to]
105 if query.timestamp[:from]
106 where['$gte'] = query.timestamp[:from]
108 if query.timestamp[:to]
109 where['$lte'] = query.timestamp[:to]
111 query_and << {'timestamp' => where}
114 unless query.payload.empty?
116 '$or' => query.payload.map { |key, value|
117 key = 'payload.' + key