diff options
-rw-r--r-- | lib/rbot/journal.rb | 29 | ||||
-rw-r--r-- | test/test_journal.rb | 11 |
2 files changed, 33 insertions, 7 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 4cab11c8..09ee7369 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -209,8 +209,20 @@ module Journal end - class JournalBroker + class Subscription + attr_reader :query + attr_reader :block + def initialize(broker, query, block) + @broker = broker + @query = query + @block = block + end + def cancel + @broker.unsubscribe(self) + end + end + def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] @@ -240,9 +252,9 @@ module Journal @consumer.call(message) if @consumer # notify subscribers - @subscriptions.each do |query, block| - if query.matches? message - block.call(message) + @subscriptions.each do |s| + if s.query.matches? message + s.block.call(message) end end end @@ -255,7 +267,6 @@ module Journal @thread.raise ConsumeInterrupt.new end - def publish(topic, payload) @queue.push JournalMessage::create(topic, payload) end @@ -263,7 +274,13 @@ module Journal # subscribe to messages that match the given query def subscribe(query, &block) raise ArgumentError.new unless block_given? - @subscriptions << [query, block] + s = Subscription.new(self, query, block) + @subscriptions << s + s + end + + def unsubscribe(subscription) + @subscriptions.delete subscription end end diff --git a/test/test_journal.rb b/test/test_journal.rb index d7a70a7c..cee8da0f 100644 --- a/test/test_journal.rb +++ b/test/test_journal.rb @@ -160,7 +160,7 @@ class JournalBrokerTest < Test::Unit::TestCase journal = JournalBroker.new # subscribe to messages: - journal.subscribe(Query.define { topic 'foo' }) do |message| + sub = journal.subscribe(Query.define { topic 'foo' }) do |message| received << message end @@ -172,6 +172,15 @@ class JournalBrokerTest < Test::Unit::TestCase # wait for messages to be consumed: sleep 0.1 assert_equal(2, received.length) + + received.clear + + journal.publish 'foo', {} + sleep 0.1 + sub.cancel + journal.publish 'foo', {} + sleep 0.1 + assert_equal(1, received.length) end end |