X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=lib%2Frbot%2Fjournal%2Fpostgres.rb;h=7b333158642d55bbc339ab9ff5ebeb337fe273a8;hb=56e4713c5c0498838ed77a409e44fbc3251acde2;hp=85514fb20f1a50c860242b81d7385eb71096b807;hpb=830c8d7d6be5d0bda2df05ff5a2383362e39f8d4;p=user%2Fhenk%2Fcode%2Fruby%2Frbot.git diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index 85514fb2..7b333158 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -5,15 +5,98 @@ # :title: journal backend for postgresql require 'pg' +require 'json' + +# wraps the postgres driver in a single thread +class PGWrapper + def initialize(uri) + @uri = uri + @queue = Queue.new + run_thread + end + + def run_thread + Thread.new do + @conn = PG.connect(@uri) + while message = @queue.pop + return_queue = message.shift + begin + result = @conn.send(*message) + return_queue << [:result, result] + rescue Exception => e + return_queue << [:exception, e] + end + end + @conn.finish + end + end + + def run_in_thread(*args) + rq = Queue.new + @queue << [rq, *args] + type, result = rq.pop + if type == :exception + raise result + else + result + end + end + + public + + def destroy + @queue << nil + end + + def exec(query) + run_in_thread(:exec, query) + end + + def exec_params(query, params) + run_in_thread(:exec_params, query, params) + end + + def escape_string(string) + @conn.escape_string(string) + end +end + +# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres. +# define function to be able to create an index in case it doesnt exist: +# source: http://stackoverflow.com/a/26012880 +CREATE_INDEX = <<-EOT +CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ +declare + l_count integer; +begin + select count(*) + into l_count + from pg_indexes + where schemaname = 'public' + and tablename = lower(table_name) + and indexname = lower(index_name); + + if l_count = 0 then + execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')'; + end if; +end; +$$ LANGUAGE plpgsql; +EOT module Irc class Bot module Journal + module Storage + class PostgresStorage < AbstractStorage + attr_reader :conn + def initialize(opts={}) - @uri = opts[:uri] || 'postgresql://localhost/rbot_journal' - @conn = PG.connect(@uri) + @uri = opts[:uri] || 'postgresql://localhost/rbot' + @conn = PGWrapper.new(@uri) + @conn.exec('set client_min_messages = warning') + @conn.exec(CREATE_INDEX) @version = @conn.exec('SHOW server_version;')[0]['server_version'] @version.gsub!(/^(\d+\.\d+)$/, '\1.0') @@ -25,12 +108,170 @@ module Journal 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version]) end @jsonb = (version >= 940) - log 'journal storage: no jsonb support, consider upgrading postgres' + log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb + log 'journal storage: postgres backend is using JSONB :)' if @jsonb + drop if opts[:drop] create_table + create_index('topic_index', 'topic') + create_index('timestamp_index', 'timestamp') end def create_table + @conn.exec(' + CREATE TABLE IF NOT EXISTS journal + (id UUID PRIMARY KEY, + topic TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON']) + end + + def create_index(index_name, column_name) + debug 'journal postges backend: create index %s for %s' % [ + index_name, column_name] + @conn.exec_params('SELECT create_index($1, $2, $3)', [ + 'journal', index_name, column_name]) + end + + def create_payload_index(key) + index_name = 'idx_payload_' + key.gsub('.', '_') + column = sql_payload_selector(key) + create_index(index_name, column) + end + + def ensure_index(key) + create_payload_index(key) + end + + def insert(m) + @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);', + [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) + end + + def find(query=nil, limit=100, offset=0, &block) + def to_message(row) + timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') + JournalMessage.new(id: row['id'], timestamp: timestamp, + topic: row['topic'], payload: JSON.parse(row['payload'])) + end + + if query + sql, params = query_to_sql(query) + sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + else + sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + params = [] + end + res = @conn.exec_params(sql, params) + if block_given? + res.each { |row| block.call(to_message(row)) } + else + res.map { |row| to_message(row) } + end + end + + # returns the number of messages that match the query + def count(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql + else + sql = 'SELECT COUNT(*) FROM journal' + params = [] + end + res = @conn.exec_params(sql, params) + res[0]['count'].to_i + end + + def remove(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'DELETE FROM journal WHERE ' + sql + else + sql = 'DELETE FROM journal;' + params = [] + end + res = @conn.exec_params(sql, params) + end + + def drop + @conn.exec('DROP TABLE journal;') rescue nil + end + + def sql_payload_selector(key) + selector = 'payload' + k = key.to_s.split('.') + k.each_index { |i| + if i >= k.length-1 + selector += '->>\'%s\'' % [@conn.escape_string(k[i])] + else + selector += '->\'%s\'' % [@conn.escape_string(k[i])] + end + } + selector + end + + def query_to_sql(query) + params = [] + placeholder = Proc.new do |value| + params << value + '$%d' % [params.length] + end + sql = {op: 'AND', list: []} + + # ID query OR condition + unless query.id.empty? + sql[:list] << { + op: 'OR', + list: query.id.map { |id| + 'id = ' + placeholder.call(id) + } + } + end + + # Topic query OR condition + unless query.topic.empty? + sql[:list] << { + op: 'OR', + list: query.topic.map { |topic| + 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%')) + } + } + end + + # Timestamp range query AND condition + if query.timestamp[:from] or query.timestamp[:to] + list = [] + if query.timestamp[:from] + list << 'timestamp >= ' + placeholder.call(query.timestamp[:from]) + end + if query.timestamp[:to] + list << 'timestamp <= ' + placeholder.call(query.timestamp[:to]) + end + sql[:list] << { + op: 'AND', + list: list + } + end + + # Payload query + unless query.payload.empty? + list = [] + query.payload.each_pair do |key, value| + selector = sql_payload_selector(key) + list << selector + ' = ' + placeholder.call(value) + end + sql[:list] << { + op: 'OR', + list: list + } + end + + sql = sql[:list].map { |stmt| + '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')' + }.join(' %s ' % [sql[:op]]) + + [sql, params] end end end