5 # :title: journal backend for postgresql
19 class PostgresStorage < AbstractStorage
22 def initialize(opts={})
23 @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
24 @conn = PG.connect(@uri)
25 @version = @conn.exec('SHOW server_version;')[0]['server_version']
27 @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
28 log 'journal storage: postgresql connected to version: ' + @version
30 version = @version.split('.')[0,3].join.to_i
32 raise StorageError.new(
33 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
35 @jsonb = (version >= 940)
36 log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
44 CREATE TABLE IF NOT EXISTS journal
47 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
48 payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
52 @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
53 [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
56 def find(query=nil, limit=100, offset=0)
58 sql, params = query_to_sql(query)
59 sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
61 sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
64 res = @conn.exec_params(sql, params)
66 timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
67 JournalMessage.new(id: row['id'], timestamp: timestamp,
68 topic: row['topic'], payload: JSON.parse(row['payload']))
72 # returns the number of messages that match the query
75 sql, params = query_to_sql(query)
76 sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
78 sql = 'SELECT COUNT(*) FROM journal'
81 res = @conn.exec_params(sql, params)
87 sql, params = query_to_sql(query)
88 sql = 'DELETE FROM journal WHERE ' + sql
90 sql = 'DELETE FROM journal;'
93 res = @conn.exec_params(sql, params)
97 @conn.exec('DROP TABLE journal;') rescue nil
100 def query_to_sql(query)
102 placeholder = Proc.new do |value|
104 '$%d' % [params.length]
106 sql = {op: 'AND', list: []}
108 # ID query OR condition
109 unless query.id.empty?
112 list: query.id.map { |id|
113 'id = ' + placeholder.call(id)
118 # Topic query OR condition
119 unless query.topic.empty?
122 list: query.topic.map { |topic|
123 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
128 # Timestamp range query AND condition
129 if query.timestamp[:from] or query.timestamp[:to]
131 if query.timestamp[:from]
132 list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
134 if query.timestamp[:to]
135 list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
144 unless query.payload.empty?
146 query.payload.each_pair do |key, value|
148 k = key.to_s.split('.')
151 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
153 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
156 list << selector + ' = ' + placeholder.call(value)
164 sql = sql[:list].map { |stmt|
165 '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
166 }.join(' %s ' % [sql[:op]])