The Well Connected Rabbit

Published

Compose's RabbitMQ can help your business and in this Write Stuff article, Ken Whipday shows how it has helped Tripcatcher. He'll show how to set up a basic AMQP message queue with NodeJS and then move on to Rabbot's powerful abstraction for RabbitMQ clusters, exchanges and queues.

We have a Node.js application hosted on Heroku. The application is starting to outgrow Heroku's 30 second limit for programs running on the web server so we need to move the slower running transactions off the web server and into background processes.

RabbitMQ is the most popular open source choice when using messaging to offload work from the web server to a background process. Compose had just launched its RabbitMQ offering and the proposition appealed to us at Tripcatcher because of its simplicity and scalability. There are lots of tutorials around too for RabbitMQ on NodeJS, so the only challenge was how to configure it to work with Compose.

In this article, we'll show you how to get two node libraries (amqplib and Rabbot) connected to Compose's RabbitMQ.

Compose Setup

The first thing we need to do is create our Compose RabbitMQ instance and create a play environment.

  1. For a small application we'll use the vhost as an environment, so we'll build one vhost to play with: one for development, one for staging and one for production. Larger applications might use one vhost per chunk of functionality, such as logging. On Compose's RabbitMQ page, select the Browser tab and click the Add vhost button and create a vhost called 'plaything'.
  2. Select the plaything vhost and add a username 'demo-user' and a suitable password. Alt Image Text
  3. Now, select the username and grant it access to the plaything vhost. Alt Image Text
  4. Back on the Overview view, make a note of the connection string ready to use it in the next step. It is in the format:
    amqps://[username]:[password]@[hostname]:[port]/[vhost]

amqplib

Since we've set up our RabbitMQ deployment, all we have to do is install amqplib, which is the most popular NodeJS library for RabbitMQ. It is comprehensive, works and there are lots of tutorials. Let's install it by running the following command in your terminal:

npm install --save amqplib

Get connected

Now that our deployment is set up and the AMQPLIB library has been installed, let's write some code and create a connection. In the uri variable below, substitute the hostname and port with the username and password you created earlier in your Compose deployment.

    var amqp      = require('amqplib/callback_api');
    var url       = require('url');

    var uri       = 'amqps://[username]:[password]@[hostname]:[port]/plaything';
    var parsedUrl = url.parse(uri);
    var opts      = { servername: parsedUrl.hostname };

    amqp.connect(uri, opts, function(err, conn) {
        if (err) {
            console.error("[AMQP]", err.message);
            return;
        }
        console.log("[AMQP] connected");
        // conn is the connection object
    });

If you now visit the Compose Admin UI page (the link is on your deployment's Overview page), you should see there is a connection to your RabbitMQ instance.

However, amqplib is a low-level library; therefore, you have to code your own cluster support. But, you can get around this by using Rabbot, which provides cluster support via a simple config setting.

Rabbot and Rabbus

Rabbot is an opinionated abstraction over amqplib. At the same time, Rabbus is another abstraction, but it abstracts over Rabbot. Rabbus makes use of Rabbot's code for making a connection. Hence, coding the Rabbot connection will work for both Rabbot and Rabbus.

One great advantage of using a Rabbot connection is it supports clusters so we can connect in a resilient manner to Compose's RabbitMQ.

In the next section, we'll show you how to create a connection with Rabbot that you can then use with Rabbot or Rabbus.

Rabbot connection

Rather than using a connection string as we did previously, we're now going to use a connection object. The parsedUrl variable is removed and we're hard-coding the hostname and port number in the connection object (this is so we can add cluster support in the next step).

Below is the updated code. Update the config object with your password, hostname and port. Then run the code and you should now see the plaything vhost on the Compose console.

    var rabbit = require('rabbot');

    var config = {
        connection: {
            protocol:  'amqps://',
            name:      'plaything',
            user:      'demo-user',
            pass:      'demo-password',
            server:     hostname1,
            port:       port1,
            vhost:     'plaything'
        }
    };

    rabbit
        .configure(config)
        .then( function() {
            console.log('rabbit is hopping');
            // ready to start rabbit receivers and publishers
        })
        .then(null, function(err) {
            console.log(err);
        });

Cluster support

On the Compose RabbitMQ console, there are two connection strings. To add the second connection string, add the hostnames as a comma-separated array. The two port numbers will probably be the same number, but add them as a comma-separated array, too.

    var config = {
        connection: {
            name:      'plaything',
            user:      'demo-user',
            pass:      'demo-password',
            server:    [hostname1, hostname2],
            port:      [port1, port2],
            vhost:     'plaything'
        }
    };

I haven't found out yet, how to test the resilience provided by the two hostnames. Using Compose with MongoDB you can drop one of the connections and test your driver switches to the other connection. Hopefully, this feature will be available in RabbitMQ soon. But, at least you are as resilient as you can be.

Configuring exchanges and queues

With Rabbot, you publish your message to an exchange and the exchange uses the Routing Key to allocate the message to a queue. The background process (e.g. worker.js) can then fetch the message from the queue.

Let's update the config variable to include an Exchange and Queue. The 'bindings' object links the Queue to the Exchange.

var config = {  
        connection: {
            protocol:  'amqps://',
            name:      'plaything',
            user:      'demo-user',
            pass:      'demo-password',
            server:    [hostname1, hostname2],
            port:      [15121, 15121],
            vhost:     'plaything'
        },
        exchanges: [
            {   name:           "worker.exchange",
                type:           "direct",
                autoDelete:     false,
                durable:        true,
                persistent:     true},

            {   name:           "deadLetter.exchange",
                type:           "fanout"}
        ],
        queues: [
            {   name:           "worker.queue",
                autoDelete:     false,
                durable:        true,
                noBatch:        true,
                limit:          1,
                subscribe:      true,
                deadLetter:     'deadLetter.exchange'},

            {   name:           'deadLetter.queue'}
        ],
        bindings: [
            {   exchange:   "worker.exchange",
                target:         "worker.queue",
                keys:           ["email"]},

            {exchange:      "deadLetter.exchange",
                target:         "deadLetter.queue",
                keys:           ["email"]}
        ]
    };

This creates two exchanges ('worker' and 'deadLetter') and two queues ('worker' and 'deadLetter'). The worker queue is going to handle all the messages from Rabbit. We can spin up additional worker processes as the load increases. Normally, we'd configure Rabbit to maximize throughput. But, in this case, we want the messages to flow through Rabbit really slow, so we can have a peep at the Management Console and watch it work.

The queue.limit is set to 1, so the messages are handed out one at a time. This is very slow but makes it easier to watch what is happening.

Sending Messages

Let's write a short harness to send 5 messages, which will send a message every 10 seconds; slow enough to watch the messages appear on the Management Console. The first 4 messages will have dataOK = true and the last message will have dataOK = false. Later, the worker will use this to determine whether to accept and process the message or to reject it.

The payload is the message content. We've arbitrarily split the payload into data (in payload.body) and meta-data (e.g. payload.routingKey).

Below is the code to send messages. Open the Management Console and look at the worker.queue. Then run the program (e.g. node sender.js) and watch the queued messages increase on the chart - exciting isn't it!

    var rabbit = require("rabbot");

    var config = {..}; // see config object defined earlier

    rabbit
        .configure(config)
        .then(function(){
            console.log('connected to Rabbit');
            var x = 0;
            var payload = {};
            payload.routingKey = 'email';

            var intervalID = setInterval(function () {
                payload.body = {msg:'Greetings, this is message ' + x, dataOK: true};
                if (x === 3) {
                    payload.body.dataOK = false;
                }
                sendMessage(payload);
                if (x === 4) {
                    clearInterval(intervalID);
                }
                x++;
            }, 10000);

        })
        .then(null, function(err){
            console.log("RabbitMQ Connection Error!");
            console.log(err.stack);
            process.exit(1);
        });

    function sendMessage(payload) {
        rabbit.publish("worker.exchange", payload, 'plaything')
            .then( function() {
                console.log('payload sent: ' + payload.body.msg);
            })
            .catch( function(err) {
                console.log(err);
            });
    }   

Receiving messages

Now, let's write a worker.js file to receive and process the messages. Use the same config object we used in the previous example.

    var rabbit = require("rabbot");
    var config = {..}; // see earlier config definition
    var handleMessage = function(payload) {

        if (payload.body.dataOK) {
            // if the data in the payload is good, lets delete the message from the queue
            console.log('payload received: ' + payload.body.msg);
            payload.ack();
        } else {
            // the data is not good, lets move the message to the dead-letter queue
            console.log('rejecting message: ' + payload.body.msg );
            payload.reject();
        }

    };

    var startListening = function() {

        rabbit.handle({}, handleMessage);

        // must define handler before starting the subscription, otherwise messages will be lost
        rabbit.startSubscription(config.queues[0].name, config.connection.name);
    };


    rabbit
        .configure(config)
        .then(function(){
            console.log('connected to Rabbit');
            startListening();
        })
        .then(null, function(err){
            console.log("RabbitMQ Connection Error!");
            console.log(err.stack);
            process.exit(1);
        })
        .catch( function(err) {
            console.log(err.stack);
            process.exit(1);
        });

Now let's test it. Run worker.js and the messages will be taken from the worker.queue. Watch the graph on the Management Console to see the messages being fetched.

One of the messages was rejected with payload.reject(). This will be in the 'deadletter' queue. Open the Management Console, view the 'deadletter' queue and use the Get Message(s) button to fetch the failed message.

Conclusion

Let's recap on what you've done so far:

  1. Setup an instance of Compose's RabbitMQ;
  2. Connected to RabbitMQ using the amqplib low-level library;
  3. Connected to RabbitMQ using Rabbot, an opinionated abstraction over amqplib;
  4. Configured Rabbot to provide both a 'worker' queue and a 'deadletter' queue for failed messages;
  5. Built a simple NodeJS program for sending messages to the 'worker' queue;
  6. Built a simple worker program for reading messages from the queue and processing them.

The RabbitMQ configuration used was quite simple, it only processed one message at a time, but this allowed us to watch the code in action on the Compose RabbitMQ Management Console.

At Tripcatcher, we initially installed RabbitMQ to move the workload of sending emails away from our web server and into a background process. But, we're now adding management reports and integrations with third parties (where the response time may be slow), too. RabbitMQ is providing a really slick way of moving the processing load off the web server, while still having the web server authenticate requests and initiate work.

I recommend looking at some RabbitMQ tutorials and checking the documentation on the config object to get a better understanding of how RabbitMQ might benefit your business.

Ken Whipday is the Tech co-founder at [Tripcatcher](http://www.tripcatcherapp.com), helping remove the hassle from mileage expenses. He lives in Cheltenham, England and in his spare time likes running and embarrassing his two daughters with really bad dad-dancing.

attribution Pexels

This article is licensed with CC-BY-NC-SA 4.0 by Compose.

Conquer the Data Layer

Spend your time developing apps, not managing databases.