Making Secure Connections With Compose RabbitMQ

Now you have your Compose RabbitMQ deployment set up, you'll want to get your RabbitMQ applications connected to it. In this article, we're going to look at how to make secure connections with TLS, with and without server verification, to a Compose RabbitMQ deployment. We're also going to do it with Java, Ruby, Python and Go. You'll still need to refer to each language's library documentation and tutorials, but they don't address quickly how to make a secure connection.

The first thing to know is that you can only use AMQP and TLS secured connections into Compose RabbitMQ. We'll assume you have set up a RabbitMQ user and password already; if not refer to the getting started guide. If you have read any RabbitMQ documentation you will know that a URI to create and unsecured connection to RabbitMQ begins with the schema "amqp://...". To start a TLS secured connection, all you need to do is switch to using "amqps://...". We'll start our explorations with Java, which is the only official RabbitMQ client we're covering here.

Connecting Java Securely

The first thing you'll need to do is to install the official RabbitMQ Java Client - there's a number of options which should be familiar to any Java developer. Pick the one that suits your development environment. Once you are ready to build, then we can move on. You may also want to refer to the RabbitMQ tutorials.

If you go to your Compose Console, select your RabbitMQ deployment and go to the overview, you'll find two connection strings, one for each access portal, available for you to use with the "amqps://" already being used in them. Just copy those into your applications and then edit the username and password as appropriate.

Let's step through the process of making a basic connection using the Java API:

public class RabbitMQConnector {

    public static void main(String[] args) {
        try {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setUri("amqps://[user]:[password]@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80");
            Connection conn=factory.newConnection();

This is just an example, so we're doing everything in the main method and that begins with getting ourselves a ConnectionFactory for RabbitMQ connections. We sent the URI for the factory so that it makes connections for us that connect to our RabbitMQ; note the amqps:// in the URI. And we then ask the factory for one of those new connections. Now, we could stop there as we've shown how to get the connection, but we'll carry on, publishing a simple message to an exchange. First, we make a channel for this publication:

            Channel channel=conn.createChannel();

Next we want to set up a message payload, in this case a string, a routing key for its onward journey and the name of an exchange to send it on to:

            String message="This is not a message, this is a tribute to a message";
            String routingKey="tributes";
            String exchangeName="postal";

With our new values, we can go about declaring the exchange (a direct exchange which can make use of the routing key), which will create it if it doesn't exist. We can then publish to the named exchange, with a routing key and the message payload encoded as bytes:

            channel.exchangeDeclare(exchangeName,"direct",true);
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes());

All that's left to do is close the channel, close the connection and put in a catch for all the exceptions that may have been thrown:

            channel.close();
            conn.close();  
        } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException | IOException | TimeoutException ex) {
            Logger.getLogger(RabbitMQConnector.class.getName()).log(Level.SEVERE, null, ex);
        }     
    }
}

A Moment To Verify

If you run that example – you'll find the full code for this and subsequent examples here – it'll silently connect, deliver the message and disconnect. You'll probably want to verify that it did something so log into the RabbitMQ Admin UI – you'll find the URL in your Compose Console – and select the Exchanges tab. There should be a "postal" exchange there which your code created. When the code runs, there should also be some activity shown in the charts.

To actually check your message landed, you can't peer into the exchange but you can...

You may wonder what happened to any messages sent before a queue was bound to the exchange. The answer is that it was discarded automatically as it could not be routed. RabbitMQ has a mechanism to catch unroutable messages called Alternate Exchanges for special cases, but it's generally best to ensure everything is routed in your messaging architecture.

You may also wonder why the message is still in the queue even when we got it. Getting a message is a destructive act, but if you look on the Get Messages panel, you'll see it's set by default to requeue the messages we pull off. This should, of course, remind you not to go getting messages from queues on your production system.

Back to the connection

You may well be happy with the encryption provided by just using "amqps://" but you may also want to ensure you are talking to the Compose RabbitMQ servers, and specifically to yours, when you make that connection. With that in mind we offer a Public SSL Certificate for each Compose RabbitMQ deployment. You can download this from the Compose Console's Overview. Once you have that as a text file locally, you can use that to start securing your connection. Again, we're going to cover the Java client so there's a little more work to do than other languages because Java's built around the idea of key and trust stores... So over at your command line, you should have your certificate to hand. We're going to create a keystore for that certificate with this command:

keytool -import -alias compose1 -file ./composecert -keystore ./rabbitstore -storetype pkcs12  

This will import the composecert (or whatever you named it) certificate file into the keystore with a name of compose1. It will write out a rabbitstore file encoded in pkcs12 format... Or at least it will after you give it a password for rabbitstore (twice) and answer "yes" when it asks you.

Now we can return to our previous code and we get to add in a wedge of code at the start. Right after the beginning of the main() code we need to do the following...

            char[] keyPassPhrase = "ilikeamiga".toCharArray();

Replace ilikeamiga with the password you set on the keystore when you created it. Now we need to create a KeyStore:

            KeyStore tks = KeyStore.getInstance("JKS");
            tks.load(new FileInputStream("./rabbitstore"), keyPassPhrase);            
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
            tmf.init(tks);

The important parts are that we're reading ./rabbitstore from the current directory into our keystore and using our passphrase to unlock that. Then we're creating a TrustManagerFactor and putting the keystore inside it. With this, we can create an implementation of SSL:

            SSLContext c = SSLContext.getInstance("TLS");
            c.init(null, tmf.getTrustManagers(), null);

The next two lines are from our earlier code, setting up the ConnectionFactory. The change here is in the third line where we tell our ConnectionFactory to use the SSLContext with our certificate:

            ConnectionFactory factory=new ConnectionFactory();
            factory.setUri("amqps://[user]:[password]@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80");
            factory.useSslProtocol(c);

And we're done. If the certificate held doesn't cryptographically match up with the server's private version then an exception will be thrown which, of course, can be caught and handled appropriately.

Well, that covers the official Java client, but what of other languages and drivers.

Moving on to Ruby

There's a number of drivers for the Ruby language. Bunny is one of the best known of them and you can read the tutorials and documentation on that site or the RabbitMQ site to see how to drive RabbitMQ. To connect to Compose RabbitMQ and to do the same as the verifying SSL example above, requires the following code:

require 'bunny'

conn = Bunny.new('amqps://dj:admin@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80',  
                 verify_peer: true,
                 tls_ca_certificates: ['./composecert'])
conn.start

ch = conn.create_channel

message = 'This is not a message, this is a tribute to a message'  
routingKey = 'tributes'  
exchangeName = 'postal'

x = ch.direct(exchangeName, durable: true)

x.publish(message, routing_key: routingKey)

ch.close  
conn.close  

The important part to note is the Bunny.new call which takes the URI and two other parameters verify_peer and tls_ca_certificates. The first, verify_peer, is set to true to ensure we verify the systems we are connecting to. The second, tls_ca_certificates, is an array of filenames of certificates we wish to use during verification. Here we've just given it the one certificate "composecert". This will now connect and publish a message. It will emit a warning something like this too:

W, [2015-11-03T10:45:51.476133 #24628]  WARN -- #<Bunny::Session:0x7fa6319881c0 dj@aws-eu-west-1-portal.1.dblayer.com:11020, vhost=tangy-rabbitmq-80, addresses=[aws-eu-west-1-portal.1.dblayer.com:11020]>: Using TLS but no client certificate is provided! If RabbitMQ is configured to verify peer  
certificate, connection upgrade will fail!  

The server is not configured to verify clients (and we don't currently offer client certificate verification) so the connection upgrade will succeed and the presence of the server's certificate has allowed the client to confirm which system it is talking to.

Python in the burrow

Again, here's a Python program which does the same as our SSL example. This uses the pika library, as recommended by the RabbitMQ developers. We'll take this one step by step:

import pika  
import sys  
import ssl

try:  
    from urllib import urlencode
except ImportError:  
    from urllib.parse import urlencode

First we pull in the libraries we need. The try/except part is to let the code import urlencode in Python 2.x and 3.x – the method changed library between the two versions. We're also only importing the ssl library for a single constant.

Now, the pika library takes a parameters instance, and there is a URLParameters constructor which will give us all that. But there's a catch: all parameters have to be in the URL including the kinds of SSL options we've been setting outside the URL in other languages. That means we have to take the SSL options, URL encode them and append them, after a ? to the URL we've been using elsewhere. That process looks like this:

ssl_opts = urlencode(  
    {'ssl_options': {'ca_certs': './composecert', 'cert_reqs': ssl.CERT_REQUIRED}})
full_url='amqps://[user]:[password]@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80?'+ssl_opts  
parameters = pika.URLParameters(full_url)  

The ssl_opts includes setting the ca_certs to our server's public certificate and setting cert_reqs so it actually does the peer validation. If you don't want to do the server validation, you can skip all of it apart from the first two imports and replace it with:

parameters = pika.URLParameters('amqps://[user]:[password]@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80')  

The rest of the Python code is exactly the same for either case:

connection = pika.BlockingConnection(parameters)  
channel = connection.channel()

message='This is not a message, this is a tribute to a message'  
my_routing_key='tributes'  
exchange_name='postal'

channel.exchange_declare(exchange=exchange_name,  
                         type='direct',
                         durable=True)


channel.basic_publish(exchange=exchange_name,  
                      routing_key=my_routing_key,
                      body=message)

channel.close()  
connection.close()  

Finally, Go

For Go, we've found we can't easily make an unverified connection, at least using the github.com/streadway/amqp package. The Go libraries see theres a certificate that needs to be matched and can't find any authority for it - hence the error: Failed to connect to RabbitMQ: x509: certificate signed by unknown authority. So, we'll make a server verified connection in this example. First the obligatory pre-amble:

package main

import (  
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {  
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

The function is there to shorten up the Go error handling in examples. We can now move on to the main method....

   func main() {
    cfg := new(tls.Config)
    cfg.RootCAs = x509.NewCertPool()

There's a tls.Config structure that can be initialized with various settings. Here we create one and then fill in the RootCAs field with a x509 certificate pool where we can hold our certificates. It's into there that we'll add our server certificate:

    if ca, err := ioutil.ReadFile("composecert"); err == nil {
        cfg.RootCAs.AppendCertsFromPEM(ca)
    }

We can now use the configuration to connect to the server.

conn, err := amqp.DialTLS("amqps://[user]:[password]@aws-eu-west-1-portal.1.dblayer.com:11020/tangy-rabbitmq-80", cfg)  
  failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

And we will be connected. For completeness, here's the rest of the Go code to create our exchange and send a message. It's a little long because the parameters are spread across multiple lines but it's essentially the same as the previous examples.

    message := "This is not a message, this is a tribute to a message"
    routingKey := "tributes"
    exchangeName := "postal"

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        exchangeName, // name
        "direct",     // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    err = ch.Publish(
        exchangeName, // exchange
        routingKey,   // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })
    failOnError(err, "Failed to publish a message")
}

And there we have the four examples, Go, Java, Ruby and Python which should help you get going with RabbitMQ on Compose. You should also be equipped now to bend any other language's driver to your will, as we covered the capabilities and requirements of the TLS connections, and a quick way to test your code is working without writing any more code. Now you are ready to go, take control of the RabbitMQ system, create your own architecture of exchanges and queues and deliver your mission critical messages.