Tag Archives: Bunny

Bunny: v0.7.1 Release

Bunny v0.7.1 is now available from GitHub. It contains a number of minor changes. Perhaps the most notable change is a deprecation message that is output to the console when the Queue#publish method is used.

The Queue#publish method is a convenience method that enables messages to be routed to a queue via the default direct exchange which has an empty string (“”) as its name. Apparently, this method has lead developers to mistakenly believe that messages are directly delivered to queues. This is most certainly not the case (Please see this documentation for an explanation).

Therefore, in order to avoid this confusion it has been decided to deprecate the Queue#publish method. If you find the method useful and would not like to see it deprecated, please raise the issue on GitHub or in the ruby-amqp Google Group.

Bunny: v0.5.4 Release Candidate 2

Bunny v0.5.4 rc2 is now available from GitHub. The code is now frozen as far as new features and enhancements are concerned. Of course, if you find any critical bugs please contact me as soon as you can so that the fix can be implemented.

The changes included in the upcoming v0.5.4 release can be viewed in the Bunny wiki.

I intend to release the new gem on 5th  October 2009. If anyone needs more time to evaluate the changes please let me know and I will consider deferring the release.

Bunny: Publishing with Merb and Rack

The following article was written by Chris McCauley who kindly gave his permission for it to be published here.

I discovered one big gotcha while integrating Bunny with Merb and wanted to share the solution.

The background to this is that my company uses clusters of Merb and Rails processes to implement data validation services. Until recently our solution for monitoring the Merb part of the infrastructure was pretty manual, reactive and time consuming. I wanted a simple way to gather stats from Merb processes in the cluster and correlate them with activity on the host server and to do this over time. I also wanted email alerts to be sent when a request fails to meet its response target.

I thought that message queuing might offer a good solution, allowing the aggregation of the request data and the production of reports to be done with the minimum impact on the actual Merb processes. The idea was that each process in the cluster would publish the timing information for requests to a queue and a separate profiler process would handle the graphing and alert functionality.

I chose Bunny as the interface to RabbitMQ because it was synchronous. Although we use EventMachine to great effect elsewhere in our solution, it was a poor fit with the Merb architecture so the tmm1-amqp gem wasn’t really an option.

I figured that this would be a simple implementation – Merb uses Rack internally and provides access to the Rack framework via a “rackup” file. This provides a way to insert custom code into the request processing chain – in effect adding your own middle-ware into Merb. Profiling and monitoring is a middleware service so integrating Bunny and Rack here was the obvious choice.

The structure of a rackup file is very simple. The basic rackup file that Merb uses is…

# use PathPrefix Middleware if :path_prefix is set in Merb::Config
if prefix = ::Merb::Config[:path_prefix]
  use Merb::Rack::PathPrefix, prefix
end

# this is our main merb application
run Merb::Rack::Application.new

… this passes the inbound request straight to Merb for processing. To add custom middle-ware into the chain, you simple require your code and “use” it…


require 'bunny_profiler'

# use PathPrefix Middleware if :path_prefix is set in Merb::Config
if prefix = ::Merb::Config[:path_prefix]
  use Merb::Rack::PathPrefix, prefix
end

#  an interface to RabbitMQ via Bunny
use BunnyProfiler

# this is our main merb application
run Merb::Rack::Application.new

Obviously the “use” can be conditional, running in test environments but not production environments, on Tuesdays but not Thursdays etc.

Each rack file is simple, apart from the constructor it has only one method called ‘call’ which gets invoked in response to an inbound request. Since the call method gets a copy of all of the request headers, I figured that code as simple as this would work…


def call(env)
  status, headers, body = nil, nil, nil

  request_path  = env["REQUEST_URI"]
  request_input = env["rack.input"].read

  #  Call the next Rack app in sequence.
  #  Given our rackup file, this really means
  #  call our application and get it to
  #  process the request
  tm = Benchmark.measure {
    status, headers, body = @app.call(env)
  }

  #  Note that the pid of the sending process
  #  is part of the message
  #
  @queue.publish("#{Process.pid}” + “|" +
                            request_path    + "|" +
                            request_input   + "|" +
                            (tm.real).to_s)

  # This is the response required by Rack
  [status, headers, body]
end

The constructor is even simpler, we just set up a queue …


def initialize(app)
  @app = app

  @bunny = Bunny.new

  @bunny.start

  @queue = @bunny.queue('clavis.requests')
end

In the call method we measure how long the main application code takes to process the request and then to send this to RabbitMQ. Each process in the cluster sends to the same queue and a single client process aggregates the data, produces graphs and sends any alerts required.

When I deployed this solution in the test environment, all was fine, graphs got produced, email alerts got sent. When I deployed it to one of our production servers, all was again fine – for nearly a full day anyway.

Then one-by-one each server in the cluster locked-up. When I checked the server log files, it became obvious that under heavy load, the calls to publish messages to the queue were failing. A quick check of the RabbitMQ log showed the following;

=ERROR REPORT==== 14-Aug-2009::16:00:34 ===

connection <0.18759.12> (running), channel 1 – error:

{amqp,command_invalid,

“expected content header for class 60, got non content header frame instead”,

none}

This isn’t a particularly user-friendly error message. After some digging, I knew that this was somehow related to multiple messages being interleaved on the same channel causing RabbitMQ to close the socket connection. It seemed like a threading issues but I couldn’t see how my code could be wrong – it’s copied from the Bunny sample code! I didn’t want to give up on the solution; the combination of Bunny, Rabbit and Rack just seemed to be such a great approach.

I had noticed one odd thing, RabbitMQ was seeing only one socket connection from all the servers in the cluster but I knew that each server was sending data – the messages being sent include the PID of the sending process. Perhaps Bunny was using a single connection to RabbitMQ and I needed to use some additional logic to handle that situation? I figured that there was nothing else for it but to try and contact the Bunny maintainer, Chris Duncan

Chris was a great help. He confirmed that there’s nothing in Bunny which could cause the same connection to be reused by all the servers but perhaps I could look at the use of different channels for each server as well as turning on the Bunny debug log.

Odd as it may seem, it was the second suggestion – turning on the log file which provided the big clue. Since turning on logging is part of the initialisation step (for me anyway), I changed the initialize method to be …


def initialize(app)
  @app = app

  @bunny = Bunny.new

  @bunny.logfile = "/var/log/bunny-#{Process.pid}.log"

  @bunny.logging = true

  @bunny.start

  @queue = @bunny.queue('clavis.requests')
end

… and restarted all of the servers in the test cluster. My idea was that I would try and reproduce the problem in the test environment by sending as many requests at the same time as possible and by brute-force, make the problem reoccur.

I didn’t get that far, I noticed that only one log file was getting created instead of the twelve that I was expecting. That really looked like only one Merb process in the cluster was initialising Bunny – but I knew (from the reports getting produced) that each process was sending data. This was a contradiction – how could only one process have initialised Bunny yet all of the processes be sending messages?

The explanation was that only one process was actually initialising Bunny and because of the name of the log file I knew which one it was – the Merb spawner process. The spawner process was opening the socket connection to RabbitMQ via Bunny, then forking the child processes and since children inherit the socket connections, each process was sending on the same channel and on the same socket. Sooner or later two messages would get interleaved and RabbitMQ would drop the connection.

The solution was simple, I moved all of the initialisation code out of the constructor and introduced a new start_bunny method which did essentially the same thing and called this like so …


def call(env)
  status, headers, body = nil, nil, nil

  request_path  = env["REQUEST_URI"]
  request_input = env["rack.input"].read

  tm = Benchmark.measure {
    status, headers, body = @app.call(env)
  }

  start_bunny unless @queue

  @queue.publish("#{Process.pid}” + “|" +
                            request_path    + "|" +
                            request_input   + "|" +
                            (tm.real).to_s)

  [status, headers, body]
end

Now each process in the cluster initialises Bunny as required. Since the socket connections are distinct, it’s safe to use the default channel. Testing showed multiple connection requests going from Bunny to RabbitMQ – something which Chris had been expecting and multiple log files being created – which I had expected. More importantly, ten thousand requests later, the problem hasn’t reoccurred.

Bunny: Ruby IRB auto-completion issue

It was brought to my attention by Sho Fukamachi that a repeatable error occurs when trying to use IRB auto-completion with Bunny in some versions of Ruby 1.8.6 and 1.8.7. The problem does not occur in Ruby 1.9.1 and seems to have been fixed in Ruby 1.8.6 p287.

In your IRB session, if you see something like the following  –


q.me/Users/chrisduncan/ruby186/lib/ruby/1.8/irb/completion.rb:159: can't convert Symbol
into String (TypeError) ...

It appears to be a bug in irb/completion.rb. I applied the following patch to fix it –

Change the line:


/^(IRB|SLex|RubyLex|RubyToken)/ =~ name

To:


/^(IRB|SLex|RubyLex|RubyToken)/ =~ name.to_s

Personally, I would advise installing an officially patched version of Ruby to remedy the issue. However, as always, the choice is yours.

Update (21/07/2011)

As a result of further investigation there is a remedy to this issue that has been implemented in Bunny. It will be included in the next release of the gem, however, if you need to get a patched version more quickly, both the 0.7.x-stable and master branches on GitHub include the patch.

Bunny: Future development

Bunny has now reached a state where it makes most of the core AMQP functionality available. It is being used in the real world and seems to be doing a reasonable job for its users.

Up until now I’ve been doing things with Bunny that I wanted to do, with some occasional help from interested parties (for which I am very grateful). I have no formal change management procedure (except for the use of the remarkable Git), no release candidates etc. If anybody thinks that a more formal structure is required then I’m open to suggestions.

What I would like is for the ‘Bunny community’ – if there is one – to drive its future development. I want to get a better feel for where and how Bunny is being used with a view to improving it further. So if you have examples of ‘real world’ Bunny usage, if you require features that don’t already exist, if you have encountered problems that you want fixed or if you have suggestions as to how Bunny can be enhanced, please let me know. You are also encouraged to fork and patch to your heart’s content.

Version 1.7 of RabbitMQ is due out quite soon and I expect Bunny to be compatible with it. For the time being I’m going to concentrate on polishing Bunny as best I can.

Finally, I’d like to say that I’m enjoying working on this project immensely and would like to thank the numerous people who have made, and continue to make, software and services available free of charge. Without their generosity I’m sure that I would never have started this project.

Bunny Howto: Subscribe to a queue

Bunny provides a simple way for a consumer to subscribe to queues. Here is an example –


require 'bunny'

my_client = Bunny.new
my_client.start
my_q = my_client.queue('my_queue')

ret = my_q.subscribe(:timeout => 5) { |msg| puts msg }
my_q.unsubscribe if ret == :timed_out

The Queue#subscribe method takes an optional timeout value and a mandatory code block.

The :timeout argument specifies how many seconds to wait for the next message to arrive on the queue before breaking out of the subscribe loop.

If a message arrives on the queue before the timeout period has elapsed, the message is processed and the timeout is reset to wait for the next message. If you do not provide a :timeout argument then a default timeout value of zero is used. This means that the subscribe will continue to process any received messages indefinitely until the process is halted by Ctrl-C for example.

The Queue#unsubscribe method cancels the consumer and tells the server to stop sending messages to that consumer. This does not affect messages that have already been delivered.

Both the Queue#subscribe and Queue#unsubscribe methods can take an optional :consumer_tag argument to identify the consumer. The server will generate a value for this if it is not passed explicitly.

The Queue#subscribe method also takes another two optional parameters –

  1. :ack => true or false – If set to true (default value is false), this argument tells the server that you will explicitly acknowledge receipt of messages (in Bunny this is done with the Queue#ack method).
  2. :header => true or false – If set to true (default value is false), when a message is received a hash containing :header, :payload and :delivery_details is passed to the code block. Otherwise, only the payload, which contains the message itself, is passed to the code block.

I am aware that there are problems with the timeout functionality in the Ruby 1.8.6 version of timeout.rb, however, I believe that most, if not all of these are solved in Ruby v1.9.1 and beyond. If the timeout option proves problematic for you under Ruby 1.8.6, let me know and I will explore alternative implementations.

To date I have not received any information one way or the other. Either people are using it and it has worked so far; people have had problems with it and have not told me about them; or no-one is using the Bunny subscribe functionality.

Above all, I hope that you enjoy using Bunny.