]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
52f5fb368f0b8c848c01085eddcdef3aaa4f267c
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 module Irc
11 class Bot
12 module Journal
13
14   module Storage
15
16     class PostgresStorage < AbstractStorage
17       attr_reader :conn
18
19       def initialize(opts={})
20         @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
21         @conn = PG.connect(@uri)
22         @conn.exec('set client_min_messages = warning')
23         @version = @conn.exec('SHOW server_version;')[0]['server_version']
24
25         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
26         log 'journal storage: postgresql connected to version: ' + @version
27         
28         version = @version.split('.')[0,3].join.to_i
29         if version < 930
30           raise StorageError.new(
31             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
32         end
33         @jsonb = (version >= 940)
34         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
35
36         drop if opts[:drop]
37         create_table
38       end
39
40       def create_table
41         @conn.exec('
42           CREATE TABLE IF NOT EXISTS journal
43             (id UUID PRIMARY KEY,
44              topic TEXT NOT NULL,
45              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
46              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
47       end
48
49       def insert(m)
50         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
51           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
52       end
53
54       def find(query=nil, limit=100, offset=0)
55         if query
56           sql, params = query_to_sql(query)
57           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
58         else
59           sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
60           params = []
61         end
62         res = @conn.exec_params(sql, params)
63         res.map do |row|
64           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
65           JournalMessage.new(id: row['id'], timestamp: timestamp,
66             topic: row['topic'], payload: JSON.parse(row['payload']))
67         end
68       end
69
70       # returns the number of messages that match the query
71       def count(query=nil)
72         if query
73           sql, params = query_to_sql(query)
74           sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
75         else
76           sql = 'SELECT COUNT(*) FROM journal'
77           params = []
78         end
79         res = @conn.exec_params(sql, params)
80         res[0]['count'].to_i
81       end
82
83       def remove(query=nil)
84         if query
85           sql, params = query_to_sql(query)
86           sql = 'DELETE FROM journal WHERE ' + sql
87         else
88           sql = 'DELETE FROM journal;'
89           params = []
90         end
91         res = @conn.exec_params(sql, params)
92       end
93
94       def drop
95         @conn.exec('DROP TABLE journal;') rescue nil
96       end
97
98       def query_to_sql(query)
99         params = []
100         placeholder = Proc.new do |value|
101           params << value
102           '$%d' % [params.length]
103         end
104         sql = {op: 'AND', list: []}
105
106         # ID query OR condition
107         unless query.id.empty?
108           sql[:list] << {
109             op: 'OR',
110             list: query.id.map { |id| 
111               'id = ' + placeholder.call(id)
112             }
113           }
114         end
115
116         # Topic query OR condition
117         unless query.topic.empty?
118           sql[:list] << {
119             op: 'OR',
120             list: query.topic.map { |topic| 
121               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
122             }
123           }
124         end
125
126         # Timestamp range query AND condition
127         if query.timestamp[:from] or query.timestamp[:to]
128           list = []
129           if query.timestamp[:from]
130             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
131           end
132           if query.timestamp[:to]
133             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
134           end
135           sql[:list] << {
136             op: 'AND',
137             list: list
138           }
139         end
140
141         # Payload query
142         unless query.payload.empty?
143           list = []
144           query.payload.each_pair do |key, value|
145             selector = 'payload'
146             k = key.to_s.split('.')
147             k.each_index { |i|
148               if i >= k.length-1
149                 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
150               else
151                 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
152               end
153             }
154             list << selector + ' = ' + placeholder.call(value)
155           end
156           sql[:list] << {
157             op: 'OR',
158             list: list
159           }
160         end
161
162         sql = sql[:list].map { |stmt|
163           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
164         }.join(' %s ' % [sql[:op]])
165
166         [sql, params]
167       end
168     end
169   end
170 end # Journal
171 end # Bot
172 end # Irc