]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/mongo.rb
Merge pull request #4 from ahpook/rename_karma
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / mongo.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for mongoDB
6
7 require 'mongo'
8 require 'json'
9
10 module Irc
11 class Bot
12 module Journal
13
14   module Storage
15
16     class MongoStorage < AbstractStorage
17       attr_reader :client
18
19       def initialize(opts={})
20         Mongo::Logger.logger.level = Logger::WARN
21         @uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
22         @client = Mongo::Client.new(@uri)
23         @collection = @client['journal']
24         log 'journal storage: mongodb connected to ' + @uri
25         
26         drop if opts[:drop]
27         @collection.indexes.create_one({topic: 1})
28         @collection.indexes.create_one({timestamp: 1})
29       end
30
31       def ensure_payload_index(key)
32         @collection.indexes.create_one({'payload.'+key => 1})
33       end
34
35       def insert(m)
36         @collection.insert_one({
37           '_id' => m.id,
38           'topic' => m.topic,
39           'timestamp' => m.timestamp,
40           'payload' => m.payload
41         })
42       end
43
44       def find(query=nil, limit=100, offset=0, &block)
45         def to_message(document)
46           JournalMessage.new(id: document['_id'],
47                              timestamp: document['timestamp'].localtime,
48                              topic: document['topic'],
49                              payload: document['payload'].to_h)
50         end
51
52         cursor = query_cursor(query).skip(offset).limit(limit)
53
54         if block_given?
55           cursor.each { |document| block.call(to_message(document)) }
56         else
57           cursor.map { |document| to_message(document) }
58         end
59       end
60
61       # returns the number of messages that match the query
62       def count(query=nil)
63         query_cursor(query).count
64       end
65
66       def remove(query=nil)
67         query_cursor(query).delete_many
68       end
69
70       def drop
71         @collection.drop
72       end
73
74       def query_cursor(query)
75         unless query
76           return @collection.find()
77         end
78
79         query_and = []
80
81         # ID query OR condition
82         unless query.id.empty?
83           query_and << {
84             '$or' => query.id.map { |_id| 
85               {'_id' => _id}
86             }
87           }
88         end
89
90         unless query.topic.empty?
91           query_and << {
92             '$or' => query.topic.map { |topic|
93               if topic.include?('*')
94                 pattern = topic.gsub('.', '\.').gsub('*', '.*')
95                 {'topic' => {'$regex' => pattern}}
96               else
97                 {'topic' => topic}
98               end
99             }
100           }
101         end
102
103         if query.timestamp[:from] or query.timestamp[:to]
104           where = {}
105           if query.timestamp[:from]
106             where['$gte'] = query.timestamp[:from]
107           end
108           if query.timestamp[:to]
109             where['$lte'] = query.timestamp[:to]
110           end
111           query_and << {'timestamp' => where}
112         end
113
114         unless query.payload.empty?
115           query_and << {
116             '$or' => query.payload.map { |key, value|
117               key = 'payload.' + key
118               {key => value}
119             }
120           }
121         end
122
123         @collection.find({
124           '$and' => query_and
125         })
126       end
127     end
128   end
129 end # Journal
130 end # Bot
131 end # Irc