]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blobdiff - lib/rbot/journal/postgres.rb
chucknorris: fix loading
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
index 85514fb20f1a50c860242b81d7385eb71096b807..7b333158642d55bbc339ab9ff5ebeb337fe273a8 100644 (file)
@@ -5,15 +5,98 @@
 # :title: journal backend for postgresql
 
 require 'pg'
+require 'json'
+
+# wraps the postgres driver in a single thread
+class PGWrapper
+  def initialize(uri)
+    @uri = uri
+    @queue = Queue.new
+    run_thread
+  end
+
+  def run_thread
+    Thread.new do
+      @conn = PG.connect(@uri)
+      while message = @queue.pop
+        return_queue = message.shift
+        begin
+          result = @conn.send(*message)
+          return_queue << [:result, result]
+        rescue Exception => e
+          return_queue << [:exception, e]
+        end
+      end
+      @conn.finish
+    end
+  end
+
+  def run_in_thread(*args)
+    rq = Queue.new
+    @queue << [rq, *args]
+    type, result = rq.pop
+    if type == :exception
+      raise result
+    else
+      result
+    end
+  end
+
+  public
+
+  def destroy
+    @queue << nil
+  end
+
+  def exec(query)
+    run_in_thread(:exec, query)
+  end
+
+  def exec_params(query, params)
+    run_in_thread(:exec_params, query, params)
+  end
+
+  def escape_string(string)
+    @conn.escape_string(string)
+  end
+end
+
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source: http://stackoverflow.com/a/26012880
+CREATE_INDEX = <<-EOT
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
+declare 
+   l_count integer;
+begin
+  select count(*)
+     into l_count
+  from pg_indexes
+  where schemaname = 'public'
+    and tablename = lower(table_name)
+    and indexname = lower(index_name);
+
+  if l_count = 0 then 
+     execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+  end if;
+end;
+$$ LANGUAGE plpgsql;
+EOT
 
 module Irc
 class Bot
 module Journal
+
   module Storage
+
     class PostgresStorage < AbstractStorage
+      attr_reader :conn
+
       def initialize(opts={})
-        @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
-        @conn = PG.connect(@uri)
+        @uri = opts[:uri] || 'postgresql://localhost/rbot'
+        @conn = PGWrapper.new(@uri)
+        @conn.exec('set client_min_messages = warning')
+        @conn.exec(CREATE_INDEX)
         @version = @conn.exec('SHOW server_version;')[0]['server_version']
 
         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
@@ -25,12 +108,170 @@ module Journal
             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
         end
         @jsonb = (version >= 940)
-        log 'journal storage: no jsonb support, consider upgrading postgres'
+        log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+        log 'journal storage: postgres backend is using JSONB :)' if @jsonb
 
+        drop if opts[:drop]
         create_table
+        create_index('topic_index', 'topic')
+        create_index('timestamp_index', 'timestamp')
       end
 
       def create_table
+        @conn.exec('
+          CREATE TABLE IF NOT EXISTS journal
+            (id UUID PRIMARY KEY,
+             topic TEXT NOT NULL,
+             timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+             payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+      end
+
+      def create_index(index_name, column_name)
+        debug 'journal postges backend: create index %s for %s' % [
+          index_name, column_name]
+        @conn.exec_params('SELECT create_index($1, $2, $3)', [
+          'journal', index_name, column_name])
+      end
+
+      def create_payload_index(key)
+        index_name = 'idx_payload_' + key.gsub('.', '_')
+        column = sql_payload_selector(key)
+        create_index(index_name, column)
+      end
+
+      def ensure_index(key)
+        create_payload_index(key)
+      end
+
+      def insert(m)
+        @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+          [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+      end
+
+      def find(query=nil, limit=100, offset=0, &block)
+        def to_message(row)
+          timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+          JournalMessage.new(id: row['id'], timestamp: timestamp,
+            topic: row['topic'], payload: JSON.parse(row['payload']))
+        end
+
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+        else
+          sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+        if block_given?
+          res.each { |row| block.call(to_message(row)) }
+        else
+          res.map { |row| to_message(row) }
+        end
+      end
+
+      # returns the number of messages that match the query
+      def count(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+        else
+          sql = 'SELECT COUNT(*) FROM journal'
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+        res[0]['count'].to_i
+      end
+
+      def remove(query=nil)
+        if query
+          sql, params = query_to_sql(query)
+          sql = 'DELETE FROM journal WHERE ' + sql
+        else
+          sql = 'DELETE FROM journal;'
+          params = []
+        end
+        res = @conn.exec_params(sql, params)
+      end
+
+      def drop
+        @conn.exec('DROP TABLE journal;') rescue nil
+      end
+
+      def sql_payload_selector(key)
+        selector = 'payload'
+        k = key.to_s.split('.')
+        k.each_index { |i|
+          if i >= k.length-1
+            selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+          else
+            selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+          end
+        }
+        selector
+      end
+
+      def query_to_sql(query)
+        params = []
+        placeholder = Proc.new do |value|
+          params << value
+          '$%d' % [params.length]
+        end
+        sql = {op: 'AND', list: []}
+
+        # ID query OR condition
+        unless query.id.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.id.map { |id| 
+              'id = ' + placeholder.call(id)
+            }
+          }
+        end
+
+        # Topic query OR condition
+        unless query.topic.empty?
+          sql[:list] << {
+            op: 'OR',
+            list: query.topic.map { |topic| 
+              'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+            }
+          }
+        end
+
+        # Timestamp range query AND condition
+        if query.timestamp[:from] or query.timestamp[:to]
+          list = []
+          if query.timestamp[:from]
+            list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+          end
+          if query.timestamp[:to]
+            list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+          end
+          sql[:list] << {
+            op: 'AND',
+            list: list
+          }
+        end
+
+        # Payload query
+        unless query.payload.empty?
+          list = []
+          query.payload.each_pair do |key, value|
+            selector = sql_payload_selector(key)
+            list << selector + ' = ' + placeholder.call(value)
+          end
+          sql[:list] << {
+            op: 'OR',
+            list: list
+          }
+        end
+
+        sql = sql[:list].map { |stmt|
+          '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+        }.join(' %s ' % [sql[:op]])
+
+        [sql, params]
       end
     end
   end