Thursday, June 18, 2009

A Sinfonia on Messaging with txAMQP, Part III

A Sinfonia on Messaging:
  1. The Voice of Business
  2. The Voice of Architecture
  3. A RabbitMQ and txAMQP Interlude

Before we play our third voice in this three-part invention, we need to do some finger exercises. In particular, let's take a look at the concepts and tools we'll be using to implement and run our kilt store messaging scenario.


The RabbitMQ FAQ has this to say about messaging:

Unlike databases which manage data at rest, messaging is used to manage data in motion. Use messaging to communicate between and scale applications, within your enterprise, across the web, or in the cloud.
Paraphasing Wikipedia's entry on AMQP:
The AMQ protocol is for managing the flow of messages across an enterprise's business systems. It is middleware to provide a point of rendezvous between backend systems, such as data stores and services, and front end systems such as end user applications.

AMQP Essentials

AMQP is a protocol for middleware servers ("servers" is used in the most general sense, here... anything that is capable of running a service) -- servers that accept, route, and buffer messages. The AMQP specification defines messaging server LEGO blocks that can be combined in various ways and numbers, achieving any manner of messaging goals, whose final forms are as diverse as the combinations of the components.

For the visually inclined, below is a simple diagram of the AMQ protocol. I've put multiple virtual hosts ("virtual hosts" in the AMQP sense, not Apache!) in the diagram to indicate support for multiple server "segments" (domains in the most general sense). There could just as easily be multiple exchanges and queues in each virtual host, as well. Likewise for publishers and consumers.

I highly recommend reading the spec: it is exceedingly clear at both intuitive and practical levels. To better understand the diagram above, be sure to read the definition of terms at the beginning as well as the subsections in 2.1 about the messaging queue and the exhange. Don't miss the message life-cycle section either -- you'll be reminded of circuitry diagrams and electronics kits, which is what AMQP really boils down to :-)

The Advanced Messaging Queing Protocol specifies that the the protocol can be used to create exchanges, message queues, chain them together, and do all of this dynamically. Any piece of code that has access to an API for your AMQP server can connect to it and communicate with other code -- using or creating simple messaging patterns or deeply complex ones. And everything in between.

RabbitMQ Quickstart
RabbitMQ is a messaging system written in Erlang, but in particular, it is an implementation of AMQP. The RabbitMQ web site provides documentation on installing and administering the messaging server. I run mine on Ubuntu, but since I've got a custom Erlang install, I didn't install the package (I dumped the source in /usr/lib/erlang/lib). To participate in the code play for this blog series, you'll need to install RabbitMQ.

Once you've got it installed, you'll need to start it up. If you've used something like Ubuntu's apt-get to install RabbitMQ, starting it up is as simple as this:
sudo rabbitmq-server

If you've got a custom setup like mine, you might need to do something like this (changing the defaults as needed):

 RABBITMQ_LOG_BASE=/var/log/rabbitmq \
 $BIN &

A txAMQP Example

Now that we've got a messaging server running, but before we try to implement out kilt store scenarios, let's take a quick sneak peek at txAMQP with a simple example having the following components:
  • a RabbitMQ server
  • a txAMQP producer
  • a txAMQP consumer
From reading the spec, we have a general sense of what needs to happen in our producer. It needs to:
  • connect to the RabbitMQ server
  • open a channel
  • send a message down the channel
Similarly, our reading lets us anticipate the needs of the consumer:
  • connect to the RabbitMQ server
  • open a channel
  • create an exchange and message queue on the RabbitMQ server, binding the two
  • check for in-coming messages and consume them
I have refactored some examples that the author of txAMQP created and I've put them up here. Once you download the three Python files (and the spec file, one directory level up), you can run them in two separate terminals. In terminal 1, start up the consumer:
python2.5 consumer amqp0-8.xml
In terminal 2, fire off a message:
python2.5 producer amqp0-8.xml \
"producer-to-consumer test message 1"
After running the producer with that message, you should see the same text rendered in the consumer terminal window. You can also fire the message off first, then start up the consumer. The message is sitting in a queue on your RabbitMQ instance and will be available to your consumer as soon as it connects.

Now that you see evidence of this working, you're going to be curious about the code :-) Go ahead and take a look. There are lots of comments in the code that give hints as to what's going on and the responsibilities that are being addressed.

If you are familiar with Twisted, you may have noted that the code looks a little strange. If you're not, you may have noticed that the code looks normal, with the exception of extensive yield usage and the inlineCallbacks decorator. Let me give a quick overview:

Ordinarily, Twisted-based libraries and applications use the asynchronous Twisted deferred idiom. However, there's a little-used bit of syntactic sugar in Twisted (for Python 2.5 and greater) that lets you write async code that looks like regular, synchronous code. This was briefly explored in a post on another blog last year. The Twisted API docstring for inlineCallbacks has a concise example.

Briefly, the difference is as follows. In standard Twisted code, we assign a deferred-producing function's or method's return value to a variable and then call that deferred's methods (e.g., addCallback):
def someFunc():
   d1 = someAsyncCall()
   d2 = anotherAsyncCall()

With inlineCallbacks, you decorate your function (or method) and yield for every deferred-producing call:
def someFunc():
   result1 = yield someAsyncCall()
   # work with result; no need for a callback
   result2 = yield anotherAsyncCall()
   # work with second result; no need for a callback

Visually, this flows as regular Python code. However, know that the yields prevent the function from blocking (given that there is no blocking code present, e.g., file I/O) and execution resumes as soon as the deferred has a result (which is assigned to the left-hand side). Since this latter idiom is used in txAMQP, I use it in the examples as well.

Next, we finally reach our implementation!



GregP said...

Aren't you missing a close parenthesis on your d2.addCallback? Looks like a Python/Twisted/txAMQP rookie mistake to me.

Duncan McGreggor said...


Ah, Greg... has it really been almost 15 years since we were roommates in college? I can't believe that much time has passed.

Nice catch :-)

alexis said...

Great article. Thanks for writing it :-)


Duncan McGreggor said...

Thanks, alexis -- glad you liked it :-)

T T said...

Thanks so much for this article. A question - how do I get the server to send the messages to all consumers? I started 2 consumers and they seem to be interchanging between who receives the message.

writeson said...

Always appreciate your writing about Twisted, has always helped clarify things for me so I can actually use them with a little more confidence.

Duncan McGreggor said...

@writeson/Doug -- thanks for your comment!

There's actually more Twisted stuff on its way...