1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# encoding: UTF-8
#-- vim:sw=2:et
#++
#
# :title: journal backend for mongoDB
require 'mongo'
require 'json'
module Irc
class Bot
module Journal
module Storage
class MongoStorage < AbstractStorage
attr_reader :client
def initialize(opts={})
Mongo::Logger.logger.level = Logger::WARN
@uri = opts[:uri] || 'mongodb://127.0.0.1:27017/rbot'
@client = Mongo::Client.new(@uri)
@collection = @client['journal']
log 'journal storage: mongodb connected to ' + @uri
drop if opts[:drop]
@collection.indexes.create_one({topic: 1})
@collection.indexes.create_one({timestamp: 1})
end
def ensure_payload_index(key)
@collection.indexes.create_one({'payload.'+key => 1})
end
def insert(m)
@collection.insert_one({
'_id' => m.id,
'topic' => m.topic,
'timestamp' => m.timestamp,
'payload' => m.payload
})
end
def find(query=nil, limit=100, offset=0, &block)
def to_message(document)
JournalMessage.new(id: document['_id'],
timestamp: document['timestamp'].localtime,
topic: document['topic'],
payload: document['payload'].to_h)
end
cursor = query_cursor(query).skip(offset).limit(limit)
if block_given?
cursor.each { |document| block.call(to_message(document)) }
else
cursor.map { |document| to_message(document) }
end
end
# returns the number of messages that match the query
def count(query=nil)
query_cursor(query).count
end
def remove(query=nil)
query_cursor(query).delete_many
end
def drop
@collection.drop
end
def query_cursor(query)
unless query
return @collection.find()
end
query_and = []
# ID query OR condition
unless query.id.empty?
query_and << {
'$or' => query.id.map { |_id|
{'_id' => _id}
}
}
end
unless query.topic.empty?
query_and << {
'$or' => query.topic.map { |topic|
if topic.include?('*')
pattern = topic.gsub('.', '\.').gsub('*', '.*')
{'topic' => {'$regex' => pattern}}
else
{'topic' => topic}
end
}
}
end
if query.timestamp[:from] or query.timestamp[:to]
where = {}
if query.timestamp[:from]
where['$gte'] = query.timestamp[:from]
end
if query.timestamp[:to]
where['$lte'] = query.timestamp[:to]
end
query_and << {'timestamp' => where}
end
unless query.payload.empty?
query_and << {
'$or' => query.payload.map { |key, value|
key = 'payload.' + key
{key => value}
}
}
end
@collection.find({
'$and' => query_and
})
end
end
end
end # Journal
end # Bot
end # Irc
|