]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
journal: simplified irc logging
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 # as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
11 # define function to be able to create an index in case it doesnt exist:
12 # source: http://stackoverflow.com/a/26012880
13 CREATE_INDEX = <<-EOT
14 CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$ 
15 declare 
16    l_count integer;
17 begin
18   select count(*)
19      into l_count
20   from pg_indexes
21   where schemaname = 'public'
22     and tablename = lower(table_name)
23     and indexname = lower(index_name);
24
25   if l_count = 0 then 
26      execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
27   end if;
28 end;
29 $$ LANGUAGE plpgsql;
30 EOT
31
32 module Irc
33 class Bot
34 module Journal
35
36   module Storage
37
38     class PostgresStorage < AbstractStorage
39       attr_reader :conn
40
41       def initialize(opts={})
42         @uri = opts[:uri] || 'postgresql://localhost/rbot'
43         @conn = PG.connect(@uri)
44         @conn.exec('set client_min_messages = warning')
45         @conn.exec(CREATE_INDEX)
46         @version = @conn.exec('SHOW server_version;')[0]['server_version']
47
48         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
49         log 'journal storage: postgresql connected to version: ' + @version
50         
51         version = @version.split('.')[0,3].join.to_i
52         if version < 930
53           raise StorageError.new(
54             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
55         end
56         @jsonb = (version >= 940)
57         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
58         log 'journal storage: postgres backend is using JSONB :)' if @jsonb
59
60         drop if opts[:drop]
61         create_table
62         create_index('topic_index', 'topic')
63         create_index('timestamp_index', 'timestamp')
64       end
65
66       def create_table
67         @conn.exec('
68           CREATE TABLE IF NOT EXISTS journal
69             (id UUID PRIMARY KEY,
70              topic TEXT NOT NULL,
71              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
72              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
73       end
74
75       def create_index(index_name, column_name)
76         debug 'journal postges backend: create index %s for %s' % [
77           index_name, column_name]
78         @conn.exec_params('SELECT create_index($1, $2, $3)', [
79           'journal', index_name, column_name])
80       end
81
82       def create_payload_index(key)
83         index_name = 'idx_payload_' + key.gsub('.', '_')
84         column = sql_payload_selector(key)
85         create_index(index_name, column)
86       end
87
88       def ensure_index(key)
89         create_payload_index(key)
90       end
91
92       def insert(m)
93         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
94           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
95       end
96
97       def find(query=nil, limit=100, offset=0, &block)
98         def to_message(row)
99           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
100           JournalMessage.new(id: row['id'], timestamp: timestamp,
101             topic: row['topic'], payload: JSON.parse(row['payload']))
102         end
103
104         if query
105           sql, params = query_to_sql(query)
106           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
107         else
108           sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
109           params = []
110         end
111         res = @conn.exec_params(sql, params)
112         if block_given?
113           res.each { |row| block.call(to_message(row)) }
114         else
115           res.map { |row| to_message(row) }
116         end
117       end
118
119       # returns the number of messages that match the query
120       def count(query=nil)
121         if query
122           sql, params = query_to_sql(query)
123           sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
124         else
125           sql = 'SELECT COUNT(*) FROM journal'
126           params = []
127         end
128         res = @conn.exec_params(sql, params)
129         res[0]['count'].to_i
130       end
131
132       def remove(query=nil)
133         if query
134           sql, params = query_to_sql(query)
135           sql = 'DELETE FROM journal WHERE ' + sql
136         else
137           sql = 'DELETE FROM journal;'
138           params = []
139         end
140         res = @conn.exec_params(sql, params)
141       end
142
143       def drop
144         @conn.exec('DROP TABLE journal;') rescue nil
145       end
146
147       def sql_payload_selector(key)
148         selector = 'payload'
149         k = key.to_s.split('.')
150         k.each_index { |i|
151           if i >= k.length-1
152             selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
153           else
154             selector += '->\'%s\'' % [@conn.escape_string(k[i])]
155           end
156         }
157         selector
158       end
159
160       def query_to_sql(query)
161         params = []
162         placeholder = Proc.new do |value|
163           params << value
164           '$%d' % [params.length]
165         end
166         sql = {op: 'AND', list: []}
167
168         # ID query OR condition
169         unless query.id.empty?
170           sql[:list] << {
171             op: 'OR',
172             list: query.id.map { |id| 
173               'id = ' + placeholder.call(id)
174             }
175           }
176         end
177
178         # Topic query OR condition
179         unless query.topic.empty?
180           sql[:list] << {
181             op: 'OR',
182             list: query.topic.map { |topic| 
183               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
184             }
185           }
186         end
187
188         # Timestamp range query AND condition
189         if query.timestamp[:from] or query.timestamp[:to]
190           list = []
191           if query.timestamp[:from]
192             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
193           end
194           if query.timestamp[:to]
195             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
196           end
197           sql[:list] << {
198             op: 'AND',
199             list: list
200           }
201         end
202
203         # Payload query
204         unless query.payload.empty?
205           list = []
206           query.payload.each_pair do |key, value|
207             selector = sql_payload_selector(key)
208             list << selector + ' = ' + placeholder.call(value)
209           end
210           sql[:list] << {
211             op: 'OR',
212             list: list
213           }
214         end
215
216         sql = sql[:list].map { |stmt|
217           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
218         }.join(' %s ' % [sql[:op]])
219
220         [sql, params]
221       end
222     end
223   end
224 end # Journal
225 end # Bot
226 end # Irc