5 # :title: journal backend for postgresql
16 class PostgresStorage < AbstractStorage
19 def initialize(opts={})
20 @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
21 @conn = PG.connect(@uri)
22 @conn.exec('set client_min_messages = warning')
23 @version = @conn.exec('SHOW server_version;')[0]['server_version']
25 @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
26 log 'journal storage: postgresql connected to version: ' + @version
28 version = @version.split('.')[0,3].join.to_i
30 raise StorageError.new(
31 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
33 @jsonb = (version >= 940)
34 log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
42 CREATE TABLE IF NOT EXISTS journal
45 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
46 payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
50 @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
51 [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
54 def find(query=nil, limit=100, offset=0)
56 sql, params = query_to_sql(query)
57 sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
59 sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
62 res = @conn.exec_params(sql, params)
64 timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
65 JournalMessage.new(id: row['id'], timestamp: timestamp,
66 topic: row['topic'], payload: JSON.parse(row['payload']))
70 # returns the number of messages that match the query
73 sql, params = query_to_sql(query)
74 sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
76 sql = 'SELECT COUNT(*) FROM journal'
79 res = @conn.exec_params(sql, params)
85 sql, params = query_to_sql(query)
86 sql = 'DELETE FROM journal WHERE ' + sql
88 sql = 'DELETE FROM journal;'
91 res = @conn.exec_params(sql, params)
95 @conn.exec('DROP TABLE journal;') rescue nil
98 def query_to_sql(query)
100 placeholder = Proc.new do |value|
102 '$%d' % [params.length]
104 sql = {op: 'AND', list: []}
106 # ID query OR condition
107 unless query.id.empty?
110 list: query.id.map { |id|
111 'id = ' + placeholder.call(id)
116 # Topic query OR condition
117 unless query.topic.empty?
120 list: query.topic.map { |topic|
121 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
126 # Timestamp range query AND condition
127 if query.timestamp[:from] or query.timestamp[:to]
129 if query.timestamp[:from]
130 list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
132 if query.timestamp[:to]
133 list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
142 unless query.payload.empty?
144 query.payload.each_pair do |key, value|
146 k = key.to_s.split('.')
149 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
151 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
154 list << selector + ' = ' + placeholder.call(value)
162 sql = sql[:list].map { |stmt|
163 '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
164 }.join(' %s ' % [sql[:op]])