diff options
author | Matthias Hecker <apoc@geekosphere.org> | 2015-06-20 19:25:39 +0200 |
---|---|---|
committer | Matthias Hecker <apoc@geekosphere.org> | 2015-06-20 19:25:39 +0200 |
commit | 85bfb8405528b2da203291b5671f9077d5b62742 (patch) | |
tree | 1bb24a408ecf4b3cd383ced9818a91438884c651 /lib/rbot | |
parent | 257ba78e8564964b78f839cce45350b824c2f410 (diff) |
journal: start with core botmodule, api changes
Diffstat (limited to 'lib/rbot')
-rw-r--r-- | lib/rbot/core/journal.rb | 52 | ||||
-rw-r--r-- | lib/rbot/ircbot.rb | 13 | ||||
-rw-r--r-- | lib/rbot/journal.rb | 132 | ||||
-rw-r--r-- | lib/rbot/journal/mongo.rb | 19 | ||||
-rw-r--r-- | lib/rbot/journal/postgres.rb | 17 |
5 files changed, 167 insertions, 66 deletions
diff --git a/lib/rbot/core/journal.rb b/lib/rbot/core/journal.rb new file mode 100644 index 00000000..f8a88620 --- /dev/null +++ b/lib/rbot/core/journal.rb @@ -0,0 +1,52 @@ +#-- vim:sw=2:et +#++ +# +# :title: rbot journal management from IRC +# +# Author:: Matthias Hecker (apoc@geekosphere.org) + +require 'rbot/journal' + +class JournalModule < CoreBotModule + + attr_reader :broker + + include Irc::Bot::Journal + + Config.register Config::StringValue.new('journal.storage', + :default => nil, + :requires_rescan => true, + :desc => 'storage engine used by the journal') + Config.register Config::StringValue.new('journal.storage.uri', + :default => nil, + :requires_rescan => true, + :desc => 'storage database uri') + + def initialize + super + storage = nil + name = @bot.config['journal.storage'] + uri = @bot.config['journal.storage.uri'] + if name + storage = Storage.create(name, uri) + end + debug 'journal broker starting up...' + @broker = JournalBroker.new(storage: storage) + end + + def cleanup + super + debug 'journal broker shutting down...' + @broker.shutdown + @broker = nil + end + + def help(plugin, topic='') + 'journal' + end + +end + +journal = JournalModule.new +journal.priority = -2 + diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb index caabc15d..46e4faaa 100644 --- a/lib/rbot/ircbot.rb +++ b/lib/rbot/ircbot.rb @@ -156,7 +156,6 @@ require 'rbot/registry' require 'rbot/plugins' require 'rbot/message' require 'rbot/language' -require 'rbot/journal' module Irc @@ -205,9 +204,6 @@ class Bot # web service attr_accessor :webservice - # persistent message queue - attr_accessor :journal - # server we are connected to # TODO multiserver def server @@ -230,6 +226,13 @@ class Bot myself.channels end + # returns the journal + def journal + if @plugins['journal'] + @plugins['journal'].broker + end + end + # nick wanted by the bot. This defaults to the irc.nick config value, # but may be overridden by a manual !nick command def wanted_nick @@ -550,8 +553,6 @@ class Bot log_session_start - @journal = Journal::JournalBroker.new(bot: self) - if $daemonize log "Redirecting standard input/output/error" [$stdin, $stdout, $stderr].each do |fd| diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 0b4324fe..c5bfcfea 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -3,6 +3,8 @@ #++ # # :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) require 'thread' require 'securerandom' @@ -22,19 +24,8 @@ module Journal =end - Config.register Config::StringValue.new('journal.storage', - :default => nil, - :requires_restart => true, - :desc => 'storage engine used by the journal') - Config.register Config::StringValue.new('journal.storage.uri', - :default => nil, - :requires_restart => true, - :desc => 'storage database uri') - class InvalidJournalMessage < StandardError end - class ConsumeInterrupt < StandardError - end class StorageError < StandardError end @@ -61,6 +52,7 @@ module Journal end end + # Access payload value by key. def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? value = pkey.split('.').reduce(@payload) do |hash, key| if hash.has_key?(key) or hash.has_key?(key.to_sym) @@ -75,6 +67,11 @@ module Journal end end + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + def ==(other) (@id == other.id) rescue false end @@ -104,7 +101,7 @@ module Journal end # returns a array of message instances that match the query - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) end # returns the number of messages that match the query @@ -176,10 +173,20 @@ module Journal attr_reader :payload def initialize(query) - @id = query[:id] - @topic = query[:topic] - @timestamp = query[:timestamp] - @payload = query[:payload] + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} end # returns true if the given message matches the query @@ -270,11 +277,11 @@ module Journal class JournalBroker class Subscription - attr_reader :query + attr_reader :topic attr_reader :block - def initialize(broker, query, block) + def initialize(broker, topic, block) @broker = broker - @query = query + @topic = topic @block = block end def cancel @@ -285,36 +292,27 @@ module Journal def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] - @bot = opts[:bot] # storage backend - if @bot - @storage = opts[:storage] || Storage.create( - @bot.config['journal.storage'], @bot.config['journal.storage.uri']) - else - @storage = opts[:storage] - end + @storage = opts[:storage] unless @storage warning 'journal broker: no storage set up, won\'t persist messages' end @queue = Queue.new # consumer thread: @thread = Thread.new do - loop do + while message = @queue.pop begin - consume @queue.pop + consume message # pop(true) ... rescue ThreadError => e - rescue ConsumeInterrupt => e - error 'journal broker: stop thread, consume interrupt raised' - break rescue Exception => e error 'journal broker: exception in consumer thread' error $! end end end - # TODO: this is a first naive implementation, later we do the - # message/query matching for incoming messages more efficiently @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} end def consume(message) @@ -322,8 +320,8 @@ module Journal @consumer.call(message) if @consumer # notify subscribers - @subscriptions.each do |s| - if s.query.matches? message + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| s.block.call(message) end end @@ -335,36 +333,68 @@ module Journal true if @storage end - def join - @thread.join - end - def shutdown - @thread.raise ConsumeInterrupt.new + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil end def publish(topic, payload) - @queue.push JournalMessage::create(topic, payload) + @queue << JournalMessage::create(topic, payload) end - # subscribe to messages that match the given query - def subscribe(query, &block) + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) raise ArgumentError.new unless block_given? - s = Subscription.new(self, query, block) + s = Subscription.new(self, topic, block) @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s s end - def unsubscribe(subscription) - @subscriptions.delete subscription + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s end - def find(query=nil, limit=100, offset=0, &block) + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end if block_given? - begin - res = @storage.find(query, limit, offset) - block.call(res) - end until res.length > 0 + @storage.find(query, limit, offset, &block) else @storage.find(query, limit, offset) end diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb index 24e9cfcc..2e735587 100644 --- a/lib/rbot/journal/mongo.rb +++ b/lib/rbot/journal/mongo.rb @@ -25,6 +25,7 @@ module Journal drop if opts[:drop] @collection.indexes.create_one({topic: 1}) + @collection.indexes.create_one({timestamp: 1}) end def ensure_index(key) @@ -40,10 +41,20 @@ module Journal }) end - def find(query=nil, limit=100, offset=0) - query_cursor(query).skip(offset).limit(limit).map do |document| - JournalMessage.new(id: document['_id'], timestamp: document['timestamp'].localtime, - topic: document['topic'], payload: document['payload'].to_h) + def find(query=nil, limit=100, offset=0, &block) + def to_message(document) + JournalMessage.new(id: document['_id'], + timestamp: document['timestamp'].localtime, + topic: document['topic'], + payload: document['payload'].to_h) + end + + cursor = query_cursor(query).skip(offset).limit(limit) + + if block_given? + cursor.each { |document| block.call(to_message(document)) } + else + cursor.map { |document| to_message(document) } end end diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index e63aefee..62590110 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -59,6 +59,7 @@ module Journal drop if opts[:drop] create_table create_index('topic_index', 'topic') + create_index('timestamp_index', 'timestamp') end def create_table @@ -92,7 +93,13 @@ module Journal [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) end - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) + def to_message(row) + timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') + JournalMessage.new(id: row['id'], timestamp: timestamp, + topic: row['topic'], payload: JSON.parse(row['payload'])) + end + if query sql, params = query_to_sql(query) sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] @@ -101,10 +108,10 @@ module Journal params = [] end res = @conn.exec_params(sql, params) - res.map do |row| - timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') - JournalMessage.new(id: row['id'], timestamp: timestamp, - topic: row['topic'], payload: JSON.parse(row['payload'])) + if block_given? + res.each { |row| block.call(to_message(row)) } + else + res.map { |row| to_message(row) } end end |