# :title: journal backend for postgresql
require 'pg'
+require 'json'
+
+# wraps the postgres driver in a single thread
+class PGWrapper
+ def initialize(uri)
+ @uri = uri
+ @queue = Queue.new
+ run_thread
+ end
+
+ def run_thread
+ Thread.new do
+ @conn = PG.connect(@uri)
+ while message = @queue.pop
+ return_queue = message.shift
+ begin
+ result = @conn.send(*message)
+ return_queue << [:result, result]
+ rescue Exception => e
+ return_queue << [:exception, e]
+ end
+ end
+ @conn.finish
+ end
+ end
+
+ def run_in_thread(*args)
+ rq = Queue.new
+ @queue << [rq, *args]
+ type, result = rq.pop
+ if type == :exception
+ raise result
+ else
+ result
+ end
+ end
+
+ public
+
+ def destroy
+ @queue << nil
+ end
+
+ def exec(query)
+ run_in_thread(:exec, query)
+ end
+
+ def exec_params(query, params)
+ run_in_thread(:exec_params, query, params)
+ end
+
+ def escape_string(string)
+ @conn.escape_string(string)
+ end
+end
+
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
+declare
+ l_count integer;
+begin
+ select count(*)
+ into l_count
+ from pg_indexes
+ where schemaname = 'public'
+ and tablename = lower(table_name)
+ and indexname = lower(index_name);
+
+ if l_count = 0 then
+ execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+ end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
module Irc
class Bot
module Journal
+
module Storage
+
class PostgresStorage < AbstractStorage
+ attr_reader :conn
+
def initialize(opts={})
- @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
- @conn = PG.connect(@uri)
+ @uri = opts[:uri] || 'postgresql://localhost/rbot'
+ @conn = PGWrapper.new(@uri)
+ @conn.exec('set client_min_messages = warning')
+ @conn.exec(CREATE_INDEX)
@version = @conn.exec('SHOW server_version;')[0]['server_version']
@version.gsub!(/^(\d+\.\d+)$/, '\1.0')
'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
end
@jsonb = (version >= 940)
- log 'journal storage: no jsonb support, consider upgrading postgres'
+ log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+ log 'journal storage: postgres backend is using JSONB :)' if @jsonb
+ drop if opts[:drop]
create_table
+ create_index('topic_index', 'topic')
+ create_index('timestamp_index', 'timestamp')
end
def create_table
+ @conn.exec('
+ CREATE TABLE IF NOT EXISTS journal
+ (id UUID PRIMARY KEY,
+ topic TEXT NOT NULL,
+ timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+ payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ end
+
+ def create_index(index_name, column_name)
+ debug 'journal postges backend: create index %s for %s' % [
+ index_name, column_name]
+ @conn.exec_params('SELECT create_index($1, $2, $3)', [
+ 'journal', index_name, column_name])
+ end
+
+ def create_payload_index(key)
+ index_name = 'idx_payload_' + key.gsub('.', '_')
+ column = sql_payload_selector(key)
+ create_index(index_name, column)
+ end
+
+ def ensure_index(key)
+ create_payload_index(key)
+ end
+
+ def insert(m)
+ @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+ [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+ end
+
+ def find(query=nil, limit=100, offset=0, &block)
+ def to_message(row)
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ else
+ sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ if block_given?
+ res.each { |row| block.call(to_message(row)) }
+ else
+ res.map { |row| to_message(row) }
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ else
+ sql = 'SELECT COUNT(*) FROM journal'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ res[0]['count'].to_i
+ end
+
+ def remove(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'DELETE FROM journal WHERE ' + sql
+ else
+ sql = 'DELETE FROM journal;'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ end
+
+ def drop
+ @conn.exec('DROP TABLE journal;') rescue nil
+ end
+
+ def sql_payload_selector(key)
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ selector
+ end
+
+ def query_to_sql(query)
+ params = []
+ placeholder = Proc.new do |value|
+ params << value
+ '$%d' % [params.length]
+ end
+ sql = {op: 'AND', list: []}
+
+ # ID query OR condition
+ unless query.id.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.id.map { |id|
+ 'id = ' + placeholder.call(id)
+ }
+ }
+ end
+
+ # Topic query OR condition
+ unless query.topic.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.topic.map { |topic|
+ 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+ }
+ }
+ end
+
+ # Timestamp range query AND condition
+ if query.timestamp[:from] or query.timestamp[:to]
+ list = []
+ if query.timestamp[:from]
+ list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+ end
+ if query.timestamp[:to]
+ list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+ end
+ sql[:list] << {
+ op: 'AND',
+ list: list
+ }
+ end
+
+ # Payload query
+ unless query.payload.empty?
+ list = []
+ query.payload.each_pair do |key, value|
+ selector = sql_payload_selector(key)
+ list << selector + ' = ' + placeholder.call(value)
+ end
+ sql[:list] << {
+ op: 'OR',
+ list: list
+ }
+ end
+
+ sql = sql[:list].map { |stmt|
+ '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+ }.join(' %s ' % [sql[:op]])
+
+ [sql, params]
end
end
end