]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
65c67eb94c5ce2e7ca8384e989f366ed98820463
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 module Irc
11 class Bot
12 module Journal
13
14   class Query
15   end
16
17   module Storage
18
19     class PostgresStorage < AbstractStorage
20       attr_reader :conn
21
22       def initialize(opts={})
23         @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
24         @conn = PG.connect(@uri)
25         @version = @conn.exec('SHOW server_version;')[0]['server_version']
26
27         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
28         log 'journal storage: postgresql connected to version: ' + @version
29         
30         version = @version.split('.')[0,3].join.to_i
31         if version < 930
32           raise StorageError.new(
33             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
34         end
35         @jsonb = (version >= 940)
36         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
37
38         drop if opts[:drop]
39         create_table
40       end
41
42       def create_table
43         @conn.exec('
44           CREATE TABLE IF NOT EXISTS journal
45             (id UUID PRIMARY KEY,
46              topic TEXT NOT NULL,
47              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
48              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
49       end
50
51       def insert(m)
52         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
53           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
54       end
55
56       def find(query=nil, limit=100, offset=0)
57         if query
58           sql, params = query_to_sql(query)
59           sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
60         else
61           sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
62           params = []
63         end
64         res = @conn.exec_params(sql, params)
65         res.map do |row|
66           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
67           JournalMessage.new(id: row['id'], timestamp: timestamp,
68             topic: row['topic'], payload: JSON.parse(row['payload']))
69         end
70       end
71
72       # returns the number of messages that match the query
73       def count(query=nil)
74         if query
75           sql, params = query_to_sql(query)
76           sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
77         else
78           sql = 'SELECT COUNT(*) FROM journal'
79           params = []
80         end
81         res = @conn.exec_params(sql, params)
82         res[0]['count'].to_i
83       end
84
85       def remove(query=nil)
86         if query
87           sql, params = query_to_sql(query)
88           sql = 'DELETE FROM journal WHERE ' + sql
89         else
90           sql = 'DELETE FROM journal;'
91           params = []
92         end
93         res = @conn.exec_params(sql, params)
94       end
95
96       def drop
97         @conn.exec('DROP TABLE journal;') rescue nil
98       end
99
100       def query_to_sql(query)
101         params = []
102         placeholder = Proc.new do |value|
103           params << value
104           '$%d' % [params.length]
105         end
106         sql = {op: 'AND', list: []}
107
108         # ID query OR condition
109         unless query.id.empty?
110           sql[:list] << {
111             op: 'OR',
112             list: query.id.map { |id| 
113               'id = ' + placeholder.call(id)
114             }
115           }
116         end
117
118         # Topic query OR condition
119         unless query.topic.empty?
120           sql[:list] << {
121             op: 'OR',
122             list: query.topic.map { |topic| 
123               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
124             }
125           }
126         end
127
128         # Timestamp range query AND condition
129         if query.timestamp[:from] or query.timestamp[:to]
130           list = []
131           if query.timestamp[:from]
132             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
133           end
134           if query.timestamp[:to]
135             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
136           end
137           sql[:list] << {
138             op: 'AND',
139             list: list
140           }
141         end
142
143         # Payload query
144         unless query.payload.empty?
145           list = []
146           query.payload.each_pair do |key, value|
147             selector = 'payload'
148             k = key.to_s.split('.')
149             k.each_index { |i|
150               if i >= k.length-1
151                 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
152               else
153                 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
154               end
155             }
156             list << selector + ' = ' + placeholder.call(value)
157           end
158           sql[:list] << {
159             op: 'OR',
160             list: list
161           }
162         end
163
164         sql = sql[:list].map { |stmt|
165           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
166         }.join(' %s ' % [sql[:op]])
167
168         [sql, params]
169       end
170     end
171   end
172 end # Journal
173 end # Bot
174 end # Irc