5 # :title: journal backend for postgresql
19 class PostgresStorage < AbstractStorage
22 def initialize(opts={})
23 @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
24 @conn = PG.connect(@uri)
25 @version = @conn.exec('SHOW server_version;')[0]['server_version']
27 @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
28 log 'journal storage: postgresql connected to version: ' + @version
30 version = @version.split('.')[0,3].join.to_i
32 raise StorageError.new(
33 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
35 @jsonb = (version >= 940)
36 log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
44 CREATE TABLE IF NOT EXISTS journal
47 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
48 payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
52 @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
53 [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
56 def find(query, limit=100, offset=0)
57 sql, params = query_to_sql(query)
58 sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
59 res = @conn.exec_params(sql, params)
61 timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
62 JournalMessage.new(id: row['id'], timestamp: timestamp,
63 topic: row['topic'], payload: JSON.parse(row['payload']))
67 # returns the number of messages that match the query
69 sql, params = query_to_sql(query)
70 sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
71 res = @conn.exec_params(sql, params)
76 @conn.exec('DROP TABLE journal;') rescue nil
79 def query_to_sql(query)
81 placeholder = Proc.new do |value|
83 '$%d' % [params.length]
85 sql = {op: 'AND', list: []}
87 # ID query OR condition
88 unless query.id.empty?
91 list: query.id.map { |id|
92 'id = ' + placeholder.call(id)
97 # Topic query OR condition
98 unless query.topic.empty?
101 list: query.topic.map { |topic|
102 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
107 # Timestamp range query AND condition
108 if query.timestamp[:from] or query.timestamp[:to]
110 if query.timestamp[:from]
111 list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
113 if query.timestamp[:to]
114 list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
123 unless query.payload.empty?
125 query.payload.each_pair do |key, value|
127 k = key.to_s.split('.')
130 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
132 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
135 list << selector + ' = ' + placeholder.call(value)
143 sql = sql[:list].map { |stmt|
144 '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
145 }.join(' %s ' % [sql[:op]])