Bunny: Quality of Service or Prefetch

Up until now I’ve not really addressed the issue of subscription with Bunny by showing how it should be used in conjunction with the AMQP Basic.Qos method. This method implements the notion of AMQP Quality of Service.

The Basic.Qos method is crucial to the successful operation of a Bunny consumer because it controls the flow of messages from the server to the client. The Bunny method Client#qos wraps Basic.Qos making it simple to call once a Bunny client is instantiated. RabbitMQ v1.6.0 only supports the :prefetch_count argument to that method at this time.

Here is an example


my_client.qos(:prefetch_count => 1)

or you could just do


my_client.qos

because :prefetch_count => 1 is the default.

Using the :prefetch_count option means that if you turn on acknowledgements in your subscribe call using the :ack option like so


my_queue.subscribe(:ack => true)

the server will only send the number of messages specified in the :prefetch_count option to the client and then will wait until those messages have been acknowledged with the following method


my_queue.ack()

before it sends any more.

When a Bunny client subscribes to a queue, what it is doing under the covers is sending the AMQP Basic.Consume method to the server to register as a consumer on a particular queue. By default the server will try to send as many messages as it can down the communication channel. If your Bunny client can consume all of the messages before it tries to do anything else, everything works well. However, if you call another Bunny method before all of the messages on the queue have been consumed, you will see errors raised.

The errors are caused because Bunny will receive messages out of sequence. Most of the time, when you call a Bunny method a reply is expected from the server to confirm that the call was successful. If you don’t use Client#qos what can happen is this

  1. Client subscribes to queue.
  2. Server sends as many messages as it can.
  3. Client calls another Bunny method in the subscribe block, or after the subscribe method has completed, leaving messages still to be consumed.
  4. The reply to that method gets queued up behind all of the incoming messages sent as a response to the subscribe method.
  5. Client reads next message and receives one from the subscribe.
  6. Client raises error because the received message is not the reply
    that was expected.

The way to avoid these out of sequence problems is to do something like this


#========== Example consumer.rb ============

require 'bunny'

b = Bunny.new()
b.start

# Set Quality of Service
b.qos

q = b.queue
10.times { q.publish('Hey Ho') }

cnt = 0

q.subscribe(:ack => true) do |msg|
  cnt += 1
  msg_cnt = q.message_count
  puts '************************'
  puts "Message: #{cnt} - #{msg}"
  puts "#{msg_cnt} message(s) left in the queue"
  puts '************************'

  if msg_cnt < 1
    q.unsubscribe
    q.ack
    break
  end

end

#============= Example end ==============

Ideally you don’t want to call the Queue#message_count method for every message in a large queue, but the code illustrates my point. By setting the :prefetch_count to 1 the subscribe block can call q.message_count and get a synchronous reply from the server. Afterwards q.ack is called which acknowledges receipt of the message and triggers the server to release the next set of messages – 1 in this case.

I hope that this will help Bunny users to get subscription working more to their liking.

Advertisements

6 thoughts on “Bunny: Quality of Service or Prefetch

  1. ~J

    I seem to have run into either an error in this example, or mis-communication on how the myqueue.ack is supposed to work. I’ve set up an application that does the following:

    b = Bunny.new
    b.start
    b.qos(:prefetch => 1)
    q = b.queue(“myqueue”)
    q.subscribe(:ack => true, :connect_timeout => 30) do |msg|
    puts msg
    q.ack
    end

    Problem is that doesn’t work. I get a delivery tag error. Removing q.ack from my example allows it to process properly.

    Any explanation or clarification to this?

    Reply
    1. Chris Duncan Post author

      I apologise for the mis-communication. The subscribe method will do the ack for you if you specify :ack => true as an argument, however, the ack will happen after your block has been processed. So, in your example, if you have an ack in your block, two acks will be fired. The first one in the block will succeed, the second automatic one will fail.

      I have updated my example to work with Bunny v0.6.0. It has an ack in the block because I want to unsubscribe from the queue and break out of the subscribe loop. If the ack is not present, the code will break the subscribe loop, but the last message would not be acknowledged and therefore would be re-queued.

      Regards,

      Chris

      Reply
  2. ~J

    As a follow up, processing 1 message from the queue and then ack’ing receives this message:

    [#]

    I’ve tried doing an q.ack(:delivery_tag => 1), which doesn’t work.

    Also a minor note, I mis-typed :connect_timeout, in my code it is :timeout.

    Reply
    1. Chris Duncan Post author

      This error message occurs when you try to send an acknowledgement but the queue delivery_tag has not been set. If you perform two acks one after the other you will see this message.

      Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s