]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blob - lib/rbot/journal/postgres.rb
57c539a72ba40a4e612a45ddeabe46d19d4c3b07
[user/henk/code/ruby/rbot.git] / lib / rbot / journal / postgres.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: journal backend for postgresql
6
7 require 'pg'
8 require 'json'
9
10 module Irc
11 class Bot
12 module Journal
13
14   class Query
15   end
16
17   module Storage
18
19     class PostgresStorage < AbstractStorage
20       attr_reader :conn
21
22       def initialize(opts={})
23         @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
24         @conn = PG.connect(@uri)
25         @version = @conn.exec('SHOW server_version;')[0]['server_version']
26
27         @version.gsub!(/^(\d+\.\d+)$/, '\1.0')
28         log 'journal storage: postgresql connected to version: ' + @version
29         
30         version = @version.split('.')[0,3].join.to_i
31         if version < 930
32           raise StorageError.new(
33             'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
34         end
35         @jsonb = (version >= 940)
36         log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
37
38         drop if opts[:drop]
39         create_table
40       end
41
42       def create_table
43         @conn.exec('
44           CREATE TABLE IF NOT EXISTS journal
45             (id UUID PRIMARY KEY,
46              topic TEXT NOT NULL,
47              timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
48              payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
49       end
50
51       def insert(m)
52         @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
53           [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
54       end
55
56       def find(query, limit=100, offset=0)
57         sql, params = query_to_sql(query)
58         sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
59         res = @conn.exec_params(sql, params)
60         res.map do |row|
61           timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
62           JournalMessage.new(id: row['id'], timestamp: timestamp,
63             topic: row['topic'], payload: JSON.parse(row['payload']))
64         end
65       end
66
67       # returns the number of messages that match the query
68       def count(query)
69         sql, params = query_to_sql(query)
70         sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
71         res = @conn.exec_params(sql, params)
72         res[0]['count'].to_i
73       end
74
75       def drop
76         @conn.exec('DROP TABLE journal;') rescue nil
77       end
78
79       def query_to_sql(query)
80         params = []
81         placeholder = Proc.new do |value|
82           params << value
83           '$%d' % [params.length]
84         end
85         sql = {op: 'AND', list: []}
86
87         # ID query OR condition
88         unless query.id.empty?
89           sql[:list] << {
90             op: 'OR',
91             list: query.id.map { |id| 
92               'id = ' + placeholder.call(id)
93             }
94           }
95         end
96
97         # Topic query OR condition
98         unless query.topic.empty?
99           sql[:list] << {
100             op: 'OR',
101             list: query.topic.map { |topic| 
102               'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
103             }
104           }
105         end
106
107         # Timestamp range query AND condition
108         if query.timestamp[:from] or query.timestamp[:to]
109           list = []
110           if query.timestamp[:from]
111             list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
112           end
113           if query.timestamp[:to]
114             list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
115           end
116           sql[:list] << {
117             op: 'AND',
118             list: list
119           }
120         end
121
122         # Payload query
123         unless query.payload.empty?
124           list = []
125           query.payload.each_pair do |key, value|
126             selector = 'payload'
127             k = key.to_s.split('.')
128             k.each_index { |i|
129               if i >= k.length-1
130                 selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
131               else
132                 selector += '->\'%s\'' % [@conn.escape_string(k[i])]
133               end
134             }
135             list << selector + ' = ' + placeholder.call(value)
136           end
137           sql[:list] << {
138             op: 'OR',
139             list: list
140           }
141         end
142
143         sql = sql[:list].map { |stmt|
144           '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
145         }.join(' %s ' % [sql[:op]])
146
147         [sql, params]
148       end
149     end
150   end
151 end # Journal
152 end # Bot
153 end # Irc