end
def self.create(topic, payload, opt={})
+ # cleanup payload to only contain strings
JournalMessage.new(
id: opt[:id] || SecureRandom.uuid,
timestamp: opt[:timestamp] || Time.now,
end
# creates/ensures a index exists on the payload specified by key
- def ensure_index(key)
+ def ensure_payload_index(key)
end
# returns a array of message instances that match the query
class JournalBroker
+ attr_reader :storage
class Subscription
attr_reader :topic
attr_reader :block
end
def publish(topic, payload)
+ debug 'journal publish message in %s: %s' % [topic, payload.inspect]
@queue << JournalMessage::create(topic, payload)
+ nil
end
# Subscribe to receive messages from a topic.
end
def count(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
@storage.count(query)
end
def remove(query=nil)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
@storage.remove(query)
end
+ def ensure_payload_index(key)
+ @storage.ensure_payload_index(key)
+ end
+
end
end # Journal