]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
Merge pull request #4 from ahpook/rename_karma
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 # wraps the postgres driver in a single thread
11 class PGWrapper
12   def initialize(uri)
13     @uri = uri
14     @queue = Queue.new
15     run_thread
16   end
17
18   def run_thread
19     Thread.new do
20       @conn = PG.connect(@uri)
21       while message = @queue.pop
22         return_queue = message.shift
23         begin
24           result = @conn.send(*message)
25           return_queue << [:result, result]
26         rescue Exception => e
27           return_queue << [:exception, e]
28         end
29       end
30       @conn.finish
31     end
32   end
33
34   def run_in_thread(*args)
35     rq = Queue.new
36     @queue << [rq, *args]
37     type, result = rq.pop
38     if type == :exception
39       raise result
40     else
41       result
42     end
43   end
44
45   public
46
47   def destroy
48     @queue << nil
49   end
50
51   def exec(query)
52     run_in_thread(:exec, query)
53   end
54
55   def exec_params(query, params)
56     run_in_thread(:exec_params, query, params)
57   end
58
59   def escape_string(string)
60     @conn.escape_string(string)
61   end
62 end
63
64 # as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
65 # define function to be able to create an index in case it doesnt exist:
66 # source: http://stackoverflow.com/a/26012880
67 CREATE_INDEX = <<-EOT
68 CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
69 declare 
70    l_count integer;
71 begin
72   select count(*)
73      into l_count
74   from pg_indexes
75   where schemaname = 'public'
76     and tablename = lower(table_name)
77     and indexname = lower(index_name);
78
79   if l_count = 0 then 
80      execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
81   end if;
82 end;
83 $$ LANGUAGE plpgsql;
84 EOT
85
86 module Irc
87 class Bot
88 module Journal
89
90   module Storage
91
92     class PostgresStorage < AbstractStorage
93       attr_reader :conn
94
95       def initialize(opts={})
96         @uri = opts[:uri] || 'postgresql://localhost/rbot'
97         @conn = PGWrapper.new(@uri)
98         @conn.exec('set client_min_messages = warning')
99         @conn.exec(CREATE_INDEX)
100         @version = @conn.exec('SHOW server_version;')[0]['server_version']
101
102         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
103         log 'journal storage: postgresql connected to version: ' + @version
104         
105         version = @version.split('.')[0,3].join.to_i
106         if version < 930
107           raise StorageError.new(
108             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
109         end
110         @jsonb = (version >= 940)
111         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
112         log 'journal storage: postgres backend is using JSONB :)' if @jsonb
113
114         drop if opts[:drop]
115         create_table
116         create_index('topic_index', 'topic')
117         create_index('timestamp_index', 'timestamp')
118       end
119
120       def create_table
121         @conn.exec('
122           CREATE TABLE IF NOT EXISTS journal
123             (id UUID PRIMARY KEY,
124              topic TEXT NOT NULL,
125              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
126              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
127       end
128
129       def create_index(index_name, column_name)
130         debug 'journal postges backend: create index %s for %s' % [
131           index_name, column_name]
132         @conn.exec_params('SELECT create_index($1, $2, $3)', [
133           'journal', index_name, column_name])
134       end
135
136       def create_payload_index(key)
137         index_name = 'idx_payload_' + key.gsub('.', '_')
138         column = sql_payload_selector(key)
139         create_index(index_name, column)
140       end
141
142       def ensure_index(key)
143         create_payload_index(key)
144       end
145
146       def insert(m)
147         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
148           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
149       end
150
151       def find(query=nil, limit=100, offset=0, &block)
152         def to_message(row)
153           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
154           JournalMessage.new(id: row['id'], timestamp: timestamp,
155             topic: row['topic'], payload: JSON.parse(row['payload']))
156         end
157
158         if query
159           sql, params = query_to_sql(query)
160           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
161         else
162           sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
163           params = []
164         end
165         res = @conn.exec_params(sql, params)
166         if block_given?
167           res.each { |row| block.call(to_message(row)) }
168         else
169           res.map { |row| to_message(row) }
170         end
171       end
172
173       # returns the number of messages that match the query
174       def count(query=nil)
175         if query
176           sql, params = query_to_sql(query)
177           sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
178         else
179           sql = 'SELECT COUNT(*) FROM journal'
180           params = []
181         end
182         res = @conn.exec_params(sql, params)
183         res[0]['count'].to_i
184       end
185
186       def remove(query=nil)
187         if query
188           sql, params = query_to_sql(query)
189           sql = 'DELETE FROM journal WHERE ' + sql
190         else
191           sql = 'DELETE FROM journal;'
192           params = []
193         end
194         res = @conn.exec_params(sql, params)
195       end
196
197       def drop
198         @conn.exec('DROP TABLE journal;') rescue nil
199       end
200
201       def sql_payload_selector(key)
202         selector = 'payload'
203         k = key.to_s.split('.')
204         k.each_index { |i|
205           if i >= k.length-1
206             selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
207           else
208             selector += '->\'%s\'' % [@conn.escape_string(k[i])]
209           end
210         }
211         selector
212       end
213
214       def query_to_sql(query)
215         params = []
216         placeholder = Proc.new do |value|
217           params << value
218           '$%d' % [params.length]
219         end
220         sql = {op: 'AND', list: []}
221
222         # ID query OR condition
223         unless query.id.empty?
224           sql[:list] << {
225             op: 'OR',
226             list: query.id.map { |id| 
227               'id = ' + placeholder.call(id)
228             }
229           }
230         end
231
232         # Topic query OR condition
233         unless query.topic.empty?
234           sql[:list] << {
235             op: 'OR',
236             list: query.topic.map { |topic| 
237               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
238             }
239           }
240         end
241
242         # Timestamp range query AND condition
243         if query.timestamp[:from] or query.timestamp[:to]
244           list = []
245           if query.timestamp[:from]
246             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
247           end
248           if query.timestamp[:to]
249             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
250           end
251           sql[:list] << {
252             op: 'AND',
253             list: list
254           }
255         end
256
257         # Payload query
258         unless query.payload.empty?
259           list = []
260           query.payload.each_pair do |key, value|
261             selector = sql_payload_selector(key)
262             list << selector + ' = ' + placeholder.call(value)
263           end
264           sql[:list] << {
265             op: 'OR',
266             list: list
267           }
268         end
269
270         sql = sql[:list].map { |stmt|
271           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
272         }.join(' %s ' % [sql[:op]])
273
274         [sql, params]
275       end
276     end
277   end
278 end # Journal
279 end # Bot
280 end # Irc