5 # :title: journal backend for postgresql
10 # as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
11 # define function to be able to create an index in case it doesnt exist:
12 # source: http://stackoverflow.com/a/26012880
14 CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
21 where schemaname = 'public'
22 and tablename = lower(table_name)
23 and indexname = lower(index_name);
26 execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
38 class PostgresStorage < AbstractStorage
41 def initialize(opts={})
42 @uri = opts[:uri] || 'postgresql://localhost/rbot'
43 @conn = PG.connect(@uri)
44 @conn.exec('set client_min_messages = warning')
45 @conn.exec(CREATE_INDEX)
46 @version = @conn.exec('SHOW server_version;')[0]['server_version']
48 @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
49 log 'journal storage: postgresql connected to version: ' + @version
51 version = @version.split('.')[0,3].join.to_i
53 raise StorageError.new(
54 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
56 @jsonb = (version >= 940)
57 log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
58 log 'journal storage: postgres backend is using JSONB :)' if @jsonb
62 create_index('topic_index', 'topic')
63 create_index('timestamp_index', 'timestamp')
68 CREATE TABLE IF NOT EXISTS journal
71 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
72 payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
75 def create_index(index_name, column_name)
76 debug 'journal postges backend: create index %s for %s' % [
77 index_name, column_name]
78 @conn.exec_params('SELECT create_index($1, $2, $3)', [
79 'journal', index_name, column_name])
82 def create_payload_index(key)
83 index_name = 'idx_payload_' + key.gsub('.', '_')
84 column = sql_payload_selector(key)
85 create_index(index_name, column)
89 create_payload_index(key)
93 @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
94 [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
97 def find(query=nil, limit=100, offset=0, &block)
99 timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
100 JournalMessage.new(id: row['id'], timestamp: timestamp,
101 topic: row['topic'], payload: JSON.parse(row['payload']))
105 sql, params = query_to_sql(query)
106 sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
108 sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
111 res = @conn.exec_params(sql, params)
113 res.each { |row| block.call(to_message(row)) }
115 res.map { |row| to_message(row) }
119 # returns the number of messages that match the query
122 sql, params = query_to_sql(query)
123 sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
125 sql = 'SELECT COUNT(*) FROM journal'
128 res = @conn.exec_params(sql, params)
132 def remove(query=nil)
134 sql, params = query_to_sql(query)
135 sql = 'DELETE FROM journal WHERE ' + sql
137 sql = 'DELETE FROM journal;'
140 res = @conn.exec_params(sql, params)
144 @conn.exec('DROP TABLE journal;') rescue nil
147 def sql_payload_selector(key)
149 k = key.to_s.split('.')
152 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
154 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
160 def query_to_sql(query)
162 placeholder = Proc.new do |value|
164 '$%d' % [params.length]
166 sql = {op: 'AND', list: []}
168 # ID query OR condition
169 unless query.id.empty?
172 list: query.id.map { |id|
173 'id = ' + placeholder.call(id)
178 # Topic query OR condition
179 unless query.topic.empty?
182 list: query.topic.map { |topic|
183 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
188 # Timestamp range query AND condition
189 if query.timestamp[:from] or query.timestamp[:to]
191 if query.timestamp[:from]
192 list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
194 if query.timestamp[:to]
195 list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
204 unless query.payload.empty?
206 query.payload.each_pair do |key, value|
207 selector = sql_payload_selector(key)
208 list << selector + ' = ' + placeholder.call(value)
216 sql = sql[:list].map { |stmt|
217 '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
218 }.join(' %s ' % [sql[:op]])