]> git.netwichtig.de Git - user/henk/code/ruby/rbot.git/blobdiff - lib/rbot/journal.rb
test: fix shadowed test method
[user/henk/code/ruby/rbot.git] / lib / rbot / journal.rb
index 5045f9d53563953e4df5288d9e6392b88db028f7..981ff6e47eaab41fa496ed929d614615772bd1e8 100644 (file)
@@ -3,6 +3,8 @@
 #++
 #
 # :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
 
 require 'thread'
 require 'securerandom'
@@ -22,19 +24,8 @@ module Journal
 
 =end
 
-  Config.register Config::StringValue.new('journal.storage',
-    :default => nil,
-    :requires_restart => true,
-    :desc => 'storage engine used by the journal')
-  Config.register Config::StringValue.new('journal.storage.uri',
-    :default => nil,
-    :requires_restart => true,
-    :desc => 'storage database uri')
-
   class InvalidJournalMessage < StandardError
   end
-  class ConsumeInterrupt < StandardError
-  end
   class StorageError < StandardError
   end
 
@@ -61,6 +52,7 @@ module Journal
       end
     end
 
+    # Access payload value by key.
     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
       value = pkey.split('.').reduce(@payload) do |hash, key|
         if hash.has_key?(key) or hash.has_key?(key.to_sym)
@@ -75,11 +67,17 @@ module Journal
       end
     end
 
+    # Access payload value by key alias for get(key, nil).
+    def [](key)
+      get(key, nil)
+    end
+
     def ==(other)
       (@id == other.id) rescue false
     end
 
     def self.create(topic, payload, opt={})
+      # cleanup payload to only contain strings
       JournalMessage.new(
         id: opt[:id] || SecureRandom.uuid,
         timestamp: opt[:timestamp] || Time.now,
@@ -100,11 +98,11 @@ module Journal
       end
 
       # creates/ensures a index exists on the payload specified by key
-      def ensure_index(key)
+      def ensure_payload_index(key)
       end
 
       # returns a array of message instances that match the query
-      def find(query=nil, limit=100, offset=0)
+      def find(query=nil, limit=100, offset=0, &block)
       end
 
       # returns the number of messages that match the query
@@ -126,7 +124,7 @@ module Journal
     end
 
     def self.create(name, uri)
-      warning 'load journal storage adapter: ' + name
+      log 'load journal storage adapter: ' + name
       load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
       cls = AbstractStorage.get_impl.first
       cls.new(uri: uri)
@@ -176,10 +174,20 @@ module Journal
     attr_reader :payload
 
     def initialize(query)
-      @id = query[:id]
-      @topic = query[:topic]
-      @timestamp = query[:timestamp]
-      @payload = query[:payload]
+      @id = query[:id] || []
+      @id = [@id] if @id.is_a? String
+      @topic = query[:topic] || []
+      @topic = [@topic] if @topic.is_a? String
+      @timestamp = {
+        from: nil, to: nil
+      }
+      if query[:timestamp] and query[:timestamp][:from]
+        @timestamp[:from] = query[:timestamp][:from]
+      end
+      if query[:timestamp] and query[:timestamp][:to]
+        @timestamp[:to] = query[:timestamp][:to]
+      end
+      @payload = query[:payload] || {}
     end
 
     # returns true if the given message matches the query
@@ -269,12 +277,13 @@ module Journal
 
 
   class JournalBroker
+    attr_reader :storage
     class Subscription
-      attr_reader :query
+      attr_reader :topic
       attr_reader :block
-      def initialize(broker, query, block)
+      def initialize(broker, topic, block)
         @broker = broker
-        @query = query
+        @topic = topic
         @block = block
       end
       def cancel
@@ -285,36 +294,27 @@ module Journal
     def initialize(opts={})
       # overrides the internal consumer with a block
       @consumer = opts[:consumer]
-      @bot = opts[:bot]
       # storage backend
-      if @bot
-        @storage = opts[:storage] || Storage.create(
-            @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
-      else
-        @storage = opts[:storage]
-      end
+      @storage = opts[:storage]
       unless @storage
         warning 'journal broker: no storage set up, won\'t persist messages'
       end
       @queue = Queue.new
       # consumer thread:
       @thread = Thread.new do
-        loop do
+        while message = @queue.pop
           begin
-            consume @queue.pop
+            consume message
           # pop(true) ... rescue ThreadError => e
-          rescue ConsumeInterrupt => e
-            error 'journal broker: stop thread, consume interrupt raised'
-            break
           rescue Exception => e
             error 'journal broker: exception in consumer thread'
             error $!
           end
         end
       end
-      # TODO: this is a first naive implementation, later we do the
-      #       message/query matching for incoming messages more efficiently
       @subscriptions = []
+      # lookup-table for subscriptions by their topic
+      @topic_subs = {}
     end
 
     def consume(message)
@@ -322,8 +322,8 @@ module Journal
       @consumer.call(message) if @consumer
 
       # notify subscribers
-      @subscriptions.each do |s|
-        if s.query.matches? message
+      if @topic_subs.has_key? message.topic
+        @topic_subs[message.topic].each do |s|
           s.block.call(message)
         end
       end
@@ -335,49 +335,93 @@ module Journal
       true if @storage
     end
 
-    def join
-      @thread.join
-    end
-
     def shutdown
-      @thread.raise ConsumeInterrupt.new
+      log 'journal shutdown'
+      @subscriptions.clear
+      @topic_subs.clear
+      @queue << nil
+      @thread.join
+      @thread = nil
     end
 
     def publish(topic, payload)
-      @queue.push JournalMessage::create(topic, payload)
+      debug 'journal publish message in %s: %s' % [topic, payload.inspect]
+      @queue << JournalMessage::create(topic, payload)
+      nil
     end
 
-    # subscribe to messages that match the given query
-    def subscribe(query, &block)
+    # Subscribe to receive messages from a topic.
+    #
+    # You can use this method to subscribe to messages that
+    # are published within a specified topic. You must provide
+    # a receiving block to receive messages one-by-one.
+    # The method returns an instance of Subscription that can
+    # be used to cancel the subscription by invoking cancel
+    # on it.
+    #
+    #   journal.subscribe('irclog') do |message|
+    #     # received irclog messages...
+    #   end
+    #
+    def subscribe(topic=nil, &block)
       raise ArgumentError.new unless block_given?
-      s = Subscription.new(self, query, block)
+      s = Subscription.new(self, topic, block)
       @subscriptions << s
+      unless @topic_subs.has_key? topic
+        @topic_subs[topic] = []
+      end
+      @topic_subs[topic] << s
       s
     end
 
-    def unsubscribe(subscription)
-      @subscriptions.delete subscription
+    def unsubscribe(s)
+      if @topic_subs.has_key? s.topic
+        @topic_subs[s.topic].delete(s)
+      end
+      @subscriptions.delete s
     end
 
-    def find(query=nil, limit=100, offset=0, &block)
+    # Find and return persisted messages by a query.
+    #
+    # This method will either return all messages or call the provided
+    # block for each message. It will filter the messages by the
+    # provided Query instance. Limit and offset might be used to
+    # constrain the result.
+    # The query might also be a hash or proc that is passed to
+    # Query.define first.
+    #
+    # @param query [Query] 
+    # @param limit [Integer] how many items to return
+    # @param offset [Integer] relative offset in results
+    def find(query, limit=100, offset=0, &block)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
       if block_given?
-        begin
-          res = @storage.find(query, limit, offset)
-          block.call(res)
-        end until res.length > 0
+        @storage.find(query, limit, offset, &block)
       else
         @storage.find(query, limit, offset)
       end
     end
 
     def count(query=nil)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
       @storage.count(query)
     end
 
     def remove(query=nil)
+      unless query.is_a? Query
+        query = Query.define(query)
+      end
       @storage.remove(query)
     end
 
+    def ensure_payload_index(key)
+      @storage.ensure_payload_index(key)
+    end
+
   end
 
 end # Journal