]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
journal: start with core botmodule, api changes
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 # as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
11 # define function to be able to create an index in case it doesnt exist:
12 # source: http://stackoverflow.com/a/26012880
13 CREATE_INDEX = <<-EOT
14 CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
15 declare 
16    l_count integer;
17 begin
18   select count(*)
19      into l_count
20   from pg_indexes
21   where schemaname = 'public'
22     and tablename = lower(table_name)
23     and indexname = lower(index_name);
24
25   if l_count = 0 then 
26      execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
27   end if;
28 end;
29 $$ LANGUAGE plpgsql;
30 EOT
31
32 module Irc
33 class Bot
34 module Journal
35
36   module Storage
37
38     class PostgresStorage < AbstractStorage
39       attr_reader :conn
40
41       def initialize(opts={})
42         @uri = opts[:uri] || 'postgresql://localhost/rbot'
43         @conn = PG.connect(@uri)
44         @conn.exec('set client_min_messages = warning')
45         @conn.exec(CREATE_INDEX)
46         @version = @conn.exec('SHOW server_version;')[0]['server_version']
47
48         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
49         log 'journal storage: postgresql connected to version: ' + @version
50         
51         version = @version.split('.')[0,3].join.to_i
52         if version < 930
53           raise StorageError.new(
54             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
55         end
56         @jsonb = (version >= 940)
57         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
58
59         drop if opts[:drop]
60         create_table
61         create_index('topic_index', 'topic')
62         create_index('timestamp_index', 'timestamp')
63       end
64
65       def create_table
66         @conn.exec('
67           CREATE TABLE IF NOT EXISTS journal
68             (id UUID PRIMARY KEY,
69              topic TEXT NOT NULL,
70              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
71              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
72       end
73
74       def create_index(index_name, column_name)
75         debug 'journal postges backend: create index %s for %s' % [
76           index_name, column_name]
77         @conn.exec_params('SELECT create_index($1, $2, $3)', [
78           'journal', index_name, column_name])
79       end
80
81       def create_payload_index(key)
82         index_name = 'idx_payload_' + key.gsub('.', '_')
83         column = sql_payload_selector(key)
84         create_index(index_name, column)
85       end
86
87       def ensure_index(key)
88         create_payload_index(key)
89       end
90
91       def insert(m)
92         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
93           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
94       end
95
96       def find(query=nil, limit=100, offset=0, &block)
97         def to_message(row)
98           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
99           JournalMessage.new(id: row['id'], timestamp: timestamp,
100             topic: row['topic'], payload: JSON.parse(row['payload']))
101         end
102
103         if query
104           sql, params = query_to_sql(query)
105           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
106         else
107           sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
108           params = []
109         end
110         res = @conn.exec_params(sql, params)
111         if block_given?
112           res.each { |row| block.call(to_message(row)) }
113         else
114           res.map { |row| to_message(row) }
115         end
116       end
117
118       # returns the number of messages that match the query
119       def count(query=nil)
120         if query
121           sql, params = query_to_sql(query)
122           sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
123         else
124           sql = 'SELECT COUNT(*) FROM journal'
125           params = []
126         end
127         res = @conn.exec_params(sql, params)
128         res[0]['count'].to_i
129       end
130
131       def remove(query=nil)
132         if query
133           sql, params = query_to_sql(query)
134           sql = 'DELETE FROM journal WHERE ' + sql
135         else
136           sql = 'DELETE FROM journal;'
137           params = []
138         end
139         res = @conn.exec_params(sql, params)
140       end
141
142       def drop
143         @conn.exec('DROP TABLE journal;') rescue nil
144       end
145
146       def sql_payload_selector(key)
147         selector = 'payload'
148         k = key.to_s.split('.')
149         k.each_index { |i|
150           if i >= k.length-1
151             selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
152           else
153             selector += '->\'%s\'' % [@conn.escape_string(k[i])]
154           end
155         }
156         selector
157       end
158
159       def query_to_sql(query)
160         params = []
161         placeholder = Proc.new do |value|
162           params << value
163           '$%d' % [params.length]
164         end
165         sql = {op: 'AND', list: []}
166
167         # ID query OR condition
168         unless query.id.empty?
169           sql[:list] << {
170             op: 'OR',
171             list: query.id.map { |id| 
172               'id = ' + placeholder.call(id)
173             }
174           }
175         end
176
177         # Topic query OR condition
178         unless query.topic.empty?
179           sql[:list] << {
180             op: 'OR',
181             list: query.topic.map { |topic| 
182               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
183             }
184           }
185         end
186
187         # Timestamp range query AND condition
188         if query.timestamp[:from] or query.timestamp[:to]
189           list = []
190           if query.timestamp[:from]
191             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
192           end
193           if query.timestamp[:to]
194             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
195           end
196           sql[:list] << {
197             op: 'AND',
198             list: list
199           }
200         end
201
202         # Payload query
203         unless query.payload.empty?
204           list = []
205           query.payload.each_pair do |key, value|
206             selector = sql_payload_selector(key)
207             list << selector + ' = ' + placeholder.call(value)
208           end
209           sql[:list] << {
210             op: 'OR',
211             list: list
212           }
213         end
214
215         sql = sql[:list].map { |stmt|
216           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
217         }.join(' %s ' % [sql[:op]])
218
219         [sql, params]
220       end
221     end
222   end
223 end # Journal
224 end # Bot
225 end # Irc