+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
+ end
+
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ if block_given?
+ @storage.find(query, limit, offset, &block)
+ else
+ @storage.find(query, limit, offset)
+ end
+ end
+
+ def count(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.count(query)
+ end
+
+ def remove(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
+ @storage.remove(query)
+ end
+
+ def ensure_payload_index(key)
+ @storage.ensure_payload_index(key)