Tag Archives: subscribe

Bunny: Quality of Service or Prefetch

Up until now I’ve not really addressed the issue of subscription with Bunny by showing how it should be used in conjunction with the AMQP Basic.Qos method. This method implements the notion of AMQP Quality of Service.

The Basic.Qos method is crucial to the successful operation of a Bunny consumer because it controls the flow of messages from the server to the client. The Bunny method Client#qos wraps Basic.Qos making it simple to call once a Bunny client is instantiated. RabbitMQ v1.6.0 only supports the :prefetch_count argument to that method at this time.

Here is an example


my_client.qos(:prefetch_count => 1)

or you could just do


my_client.qos

because :prefetch_count => 1 is the default.

Using the :prefetch_count option means that if you turn on acknowledgements in your subscribe call using the :ack option like so


my_queue.subscribe(:ack => true)

the server will only send the number of messages specified in the :prefetch_count option to the client and then will wait until those messages have been acknowledged with the following method


my_queue.ack()

before it sends any more.

When a Bunny client subscribes to a queue, what it is doing under the covers is sending the AMQP Basic.Consume method to the server to register as a consumer on a particular queue. By default the server will try to send as many messages as it can down the communication channel. If your Bunny client can consume all of the messages before it tries to do anything else, everything works well. However, if you call another Bunny method before all of the messages on the queue have been consumed, you will see errors raised.

The errors are caused because Bunny will receive messages out of sequence. Most of the time, when you call a Bunny method a reply is expected from the server to confirm that the call was successful. If you don’t use Client#qos what can happen is this

  1. Client subscribes to queue.
  2. Server sends as many messages as it can.
  3. Client calls another Bunny method in the subscribe block, or after the subscribe method has completed, leaving messages still to be consumed.
  4. The reply to that method gets queued up behind all of the incoming messages sent as a response to the subscribe method.
  5. Client reads next message and receives one from the subscribe.
  6. Client raises error because the received message is not the reply
    that was expected.

The way to avoid these out of sequence problems is to do something like this


#========== Example consumer.rb ============

require 'bunny'

b = Bunny.new()
b.start

# Set Quality of Service
b.qos

q = b.queue
10.times { q.publish('Hey Ho') }

cnt = 0

q.subscribe(:ack => true) do |msg|
  cnt += 1
  msg_cnt = q.message_count
  puts '************************'
  puts "Message: #{cnt} - #{msg}"
  puts "#{msg_cnt} message(s) left in the queue"
  puts '************************'

  if msg_cnt < 1
    q.unsubscribe
    q.ack
    break
  end

end

#============= Example end ==============

Ideally you don’t want to call the Queue#message_count method for every message in a large queue, but the code illustrates my point. By setting the :prefetch_count to 1 the subscribe block can call q.message_count and get a synchronous reply from the server. Afterwards q.ack is called which acknowledges receipt of the message and triggers the server to release the next set of messages – 1 in this case.

I hope that this will help Bunny users to get subscription working more to their liking.

Advertisements

Bunny Howto: Subscribe to a queue

Bunny provides a simple way for a consumer to subscribe to queues. Here is an example –


require 'bunny'

my_client = Bunny.new
my_client.start
my_q = my_client.queue('my_queue')

ret = my_q.subscribe(:timeout => 5) { |msg| puts msg }
my_q.unsubscribe if ret == :timed_out

The Queue#subscribe method takes an optional timeout value and a mandatory code block.

The :timeout argument specifies how many seconds to wait for the next message to arrive on the queue before breaking out of the subscribe loop.

If a message arrives on the queue before the timeout period has elapsed, the message is processed and the timeout is reset to wait for the next message. If you do not provide a :timeout argument then a default timeout value of zero is used. This means that the subscribe will continue to process any received messages indefinitely until the process is halted by Ctrl-C for example.

The Queue#unsubscribe method cancels the consumer and tells the server to stop sending messages to that consumer. This does not affect messages that have already been delivered.

Both the Queue#subscribe and Queue#unsubscribe methods can take an optional :consumer_tag argument to identify the consumer. The server will generate a value for this if it is not passed explicitly.

The Queue#subscribe method also takes another two optional parameters –

  1. :ack => true or false – If set to true (default value is false), this argument tells the server that you will explicitly acknowledge receipt of messages (in Bunny this is done with the Queue#ack method).
  2. :header => true or false – If set to true (default value is false), when a message is received a hash containing :header, :payload and :delivery_details is passed to the code block. Otherwise, only the payload, which contains the message itself, is passed to the code block.

I am aware that there are problems with the timeout functionality in the Ruby 1.8.6 version of timeout.rb, however, I believe that most, if not all of these are solved in Ruby v1.9.1 and beyond. If the timeout option proves problematic for you under Ruby 1.8.6, let me know and I will explore alternative implementations.

To date I have not received any information one way or the other. Either people are using it and it has worked so far; people have had problems with it and have not told me about them; or no-one is using the Bunny subscribe functionality.

Above all, I hope that you enjoy using Bunny.