X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=lib%2Frbot%2Fjournal.rb;h=981ff6e47eaab41fa496ed929d614615772bd1e8;hb=a19f7bfb97e5f36e6b282fcc0982584838e86a0a;hp=5045f9d53563953e4df5288d9e6392b88db028f7;hpb=6ead2df0ba73243c0d1805324b0fe64d85c08bac;p=user%2Fhenk%2Fcode%2Fruby%2Frbot.git diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 5045f9d5..981ff6e4 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -3,6 +3,8 @@ #++ # # :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) require 'thread' require 'securerandom' @@ -22,19 +24,8 @@ module Journal =end - Config.register Config::StringValue.new('journal.storage', - :default => nil, - :requires_restart => true, - :desc => 'storage engine used by the journal') - Config.register Config::StringValue.new('journal.storage.uri', - :default => nil, - :requires_restart => true, - :desc => 'storage database uri') - class InvalidJournalMessage < StandardError end - class ConsumeInterrupt < StandardError - end class StorageError < StandardError end @@ -61,6 +52,7 @@ module Journal end end + # Access payload value by key. def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? value = pkey.split('.').reduce(@payload) do |hash, key| if hash.has_key?(key) or hash.has_key?(key.to_sym) @@ -75,11 +67,17 @@ module Journal end end + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + def ==(other) (@id == other.id) rescue false end def self.create(topic, payload, opt={}) + # cleanup payload to only contain strings JournalMessage.new( id: opt[:id] || SecureRandom.uuid, timestamp: opt[:timestamp] || Time.now, @@ -100,11 +98,11 @@ module Journal end # creates/ensures a index exists on the payload specified by key - def ensure_index(key) + def ensure_payload_index(key) end # returns a array of message instances that match the query - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) end # returns the number of messages that match the query @@ -126,7 +124,7 @@ module Journal end def self.create(name, uri) - warning 'load journal storage adapter: ' + name + log 'load journal storage adapter: ' + name load File.join(File.dirname(__FILE__), 'journal', name + '.rb') cls = AbstractStorage.get_impl.first cls.new(uri: uri) @@ -176,10 +174,20 @@ module Journal attr_reader :payload def initialize(query) - @id = query[:id] - @topic = query[:topic] - @timestamp = query[:timestamp] - @payload = query[:payload] + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} end # returns true if the given message matches the query @@ -269,12 +277,13 @@ module Journal class JournalBroker + attr_reader :storage class Subscription - attr_reader :query + attr_reader :topic attr_reader :block - def initialize(broker, query, block) + def initialize(broker, topic, block) @broker = broker - @query = query + @topic = topic @block = block end def cancel @@ -285,36 +294,27 @@ module Journal def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] - @bot = opts[:bot] # storage backend - if @bot - @storage = opts[:storage] || Storage.create( - @bot.config['journal.storage'], @bot.config['journal.storage.uri']) - else - @storage = opts[:storage] - end + @storage = opts[:storage] unless @storage warning 'journal broker: no storage set up, won\'t persist messages' end @queue = Queue.new # consumer thread: @thread = Thread.new do - loop do + while message = @queue.pop begin - consume @queue.pop + consume message # pop(true) ... rescue ThreadError => e - rescue ConsumeInterrupt => e - error 'journal broker: stop thread, consume interrupt raised' - break rescue Exception => e error 'journal broker: exception in consumer thread' error $! end end end - # TODO: this is a first naive implementation, later we do the - # message/query matching for incoming messages more efficiently @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} end def consume(message) @@ -322,8 +322,8 @@ module Journal @consumer.call(message) if @consumer # notify subscribers - @subscriptions.each do |s| - if s.query.matches? message + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| s.block.call(message) end end @@ -335,49 +335,93 @@ module Journal true if @storage end - def join - @thread.join - end - def shutdown - @thread.raise ConsumeInterrupt.new + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil end def publish(topic, payload) - @queue.push JournalMessage::create(topic, payload) + debug 'journal publish message in %s: %s' % [topic, payload.inspect] + @queue << JournalMessage::create(topic, payload) + nil end - # subscribe to messages that match the given query - def subscribe(query, &block) + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) raise ArgumentError.new unless block_given? - s = Subscription.new(self, query, block) + s = Subscription.new(self, topic, block) @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s s end - def unsubscribe(subscription) - @subscriptions.delete subscription + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s end - def find(query=nil, limit=100, offset=0, &block) + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end if block_given? - begin - res = @storage.find(query, limit, offset) - block.call(res) - end until res.length > 0 + @storage.find(query, limit, offset, &block) else @storage.find(query, limit, offset) end end def count(query=nil) + unless query.is_a? Query + query = Query.define(query) + end @storage.count(query) end def remove(query=nil) + unless query.is_a? Query + query = Query.define(query) + end @storage.remove(query) end + def ensure_payload_index(key) + @storage.ensure_payload_index(key) + end + end end # Journal