5 # :title: journal backend for postgresql
10 # wraps the postgres driver in a single thread
20 @conn = PG.connect(@uri)
21 while message = @queue.pop
22 return_queue = message.shift
24 result = @conn.send(*message)
25 return_queue << [:result, result]
27 return_queue << [:exception, e]
34 def run_in_thread(*args)
52 run_in_thread(:exec, query)
55 def exec_params(query, params)
56 run_in_thread(:exec_params, query, params)
59 def escape_string(string)
60 @conn.escape_string(string)
64 # as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
65 # define function to be able to create an index in case it doesnt exist:
66 # source: http://stackoverflow.com/a/26012880
68 CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
75 where schemaname = 'public'
76 and tablename = lower(table_name)
77 and indexname = lower(index_name);
80 execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
92 class PostgresStorage < AbstractStorage
95 def initialize(opts={})
96 @uri = opts[:uri] || 'postgresql://localhost/rbot'
97 @conn = PGWrapper.new(@uri)
98 @conn.exec('set client_min_messages = warning')
99 @conn.exec(CREATE_INDEX)
100 @version = @conn.exec('SHOW server_version;')[0]['server_version']
102 @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
103 log 'journal storage: postgresql connected to version: ' + @version
105 version = @version.split('.')[0,3].join.to_i
107 raise StorageError.new(
108 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
110 @jsonb = (version >= 940)
111 log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
112 log 'journal storage: postgres backend is using JSONB :)' if @jsonb
116 create_index('topic_index', 'topic')
117 create_index('timestamp_index', 'timestamp')
122 CREATE TABLE IF NOT EXISTS journal
123 (id UUID PRIMARY KEY,
125 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
126 payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
129 def create_index(index_name, column_name)
130 debug 'journal postges backend: create index %s for %s' % [
131 index_name, column_name]
132 @conn.exec_params('SELECT create_index($1, $2, $3)', [
133 'journal', index_name, column_name])
136 def create_payload_index(key)
137 index_name = 'idx_payload_' + key.gsub('.', '_')
138 column = sql_payload_selector(key)
139 create_index(index_name, column)
142 def ensure_index(key)
143 create_payload_index(key)
147 @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
148 [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
151 def find(query=nil, limit=100, offset=0, &block)
153 timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
154 JournalMessage.new(id: row['id'], timestamp: timestamp,
155 topic: row['topic'], payload: JSON.parse(row['payload']))
159 sql, params = query_to_sql(query)
160 sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
162 sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
165 res = @conn.exec_params(sql, params)
167 res.each { |row| block.call(to_message(row)) }
169 res.map { |row| to_message(row) }
173 # returns the number of messages that match the query
176 sql, params = query_to_sql(query)
177 sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
179 sql = 'SELECT COUNT(*) FROM journal'
182 res = @conn.exec_params(sql, params)
186 def remove(query=nil)
188 sql, params = query_to_sql(query)
189 sql = 'DELETE FROM journal WHERE ' + sql
191 sql = 'DELETE FROM journal;'
194 res = @conn.exec_params(sql, params)
198 @conn.exec('DROP TABLE journal;') rescue nil
201 def sql_payload_selector(key)
203 k = key.to_s.split('.')
206 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
208 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
214 def query_to_sql(query)
216 placeholder = Proc.new do |value|
218 '$%d' % [params.length]
220 sql = {op: 'AND', list: []}
222 # ID query OR condition
223 unless query.id.empty?
226 list: query.id.map { |id|
227 'id = ' + placeholder.call(id)
232 # Topic query OR condition
233 unless query.topic.empty?
236 list: query.topic.map { |topic|
237 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
242 # Timestamp range query AND condition
243 if query.timestamp[:from] or query.timestamp[:to]
245 if query.timestamp[:from]
246 list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
248 if query.timestamp[:to]
249 list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
258 unless query.payload.empty?
260 query.payload.each_pair do |key, value|
261 selector = sql_payload_selector(key)
262 list << selector + ' = ' + placeholder.call(value)
270 sql = sql[:list].map { |stmt|
271 '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
272 }.join(' %s ' % [sql[:op]])