Go and Compose - Redis, RethinkDB, and RabbitMQ

Published

How to connect to every Compose database using Go: This is the second stop on the Compose Grand Tour which shows you the drivers, code and everything you need to get going with Go and Compose Redis, RethinkDB, AND RabbitMQ.

Compose's Grand Tour project sets out to provide an example of connecting to all of the databases of Compose. This includes using TLS/SSL, self-signed certificates, and any other appropriate good practice to get a more secure connection. In this second part, we'll look at Redis, RethinkDB and RabbitMQ.

In the previous part, we looked at Go with MongoDB, Elasticsearch, and PostgreSQL and at "The Example", a simple web application that this, and all other Grand Tour examples are based on. In the third part, we'll be covering etcd, Scylla and MySQL wrapping up Go and moving onto the next language.

Redis

The Code: You'll find the code for the Redis example in the example-redis directory of the Grand Tour - Go repository.

The Driver: The example uses Gary Burd's Redigo driver. This is a low-level driver which handles the plumbing layer of talking to Redis but doesn't turn Redis commands into idiomatic Go. We bring that in with this import:

import (  
    "github.com/garyburd/redigo/redis"
)

The Connection: Compose's Redis supports TLS/SSL connections and the Redigo driver handles them in its stride. This means the entire connection code is:

var conn redis.Conn

func main() {  
    var err error
    conn, err = redis.DialURL(os.Getenv("COMPOSE_REDIS_URL"))
    defer conn.Close()

We do recommend that if you want the connection to fail-fast rather than waiting for the first operation that you PING the server after connecting like so:

  _, err = redis.String(conn.Do("PING"))

    if err != nil {
        log.Fatal(err)
    }

The Read: Because Redis is a key/value store, we decided to keep all the words and definitions in one key: words. We use a Redis Hash structure under that key. Getting the data from that field simply means sending the HGETALL command with the key words as a parameter:

        wordresult, err := redis.StringMap(conn.Do("HGETALL", "words"))

The redis.StringMap function is a helper. The conn.Do function returns an interface to the results and the helper casts those results into a String map for easier processing. At the moment, the returned data is still a hash and we'll want to decant those results into item structures before converting them to JSON and sending them on to the requesting browser. We do that like so:

    i := 0
    words := make([]item, len(wordresult))
    for word, def := range wordresult {
        words[i] = item{Word: word, Definition: def}
        i = i + 1
    }

The Write: This is also a simple operation. We use HSET on the key and set a hash key and value – the word and definition – in the hash:

    _, err := conn.Do("HSET", "words", r.Form.Get("word"), r.Form.Get("definition"))

There's no return value we're interested in, just whether or not there was an error. The values are pulled straight from the request's parsed form. As a hash, you do need to remember that if you use the same word for two definitions, one will overwrite the other.

RethinkDB

The Code: For RethinkDB, the code can be found in example-rethinkdb directory of the Grand Tour - Go repository.

The Driver: For Go, there is one definitive driver, GoRethink which is a comprehensive and idiomatic rendering of the ReQL API, which is essentially designed for dynamic languages such as JavaScript, into the statically typed Go. For clarity, we import it like so:

import (  
    rethink "gopkg.in/gorethink/gorethink.v3"
)

Regular RethinkDB users may change rethink to r to match other examples and uses around RethinkDB.

The Connection: On to connecting to the database. Compose uses self-signed certificates to validate connections so you'll want to copy the certificate from the Compose console, save it in a file and point the PATH_TO_RETHINKDB_CERT environment variable to it. In the code, it's the first thing we take care of, creating a certificate pool:

  roots := x509.NewCertPool()
    cert, err := ioutil.ReadFile(os.Getenv("PATH_TO_RETHINKDB_CERT"))
    roots.AppendCertsFromPEM(cert) 

Now we have to parse the connection string. You will have obtained that from the RethinkDB Proxy Connection strings field in the Compose console. Assign it to the COMPOSE_RETHINK_URL environment variable so this example can pick it up when it runs.

  rethinkurl, err := url.Parse(os.Getenv("COMPOSE_RETHINKDB_URL"))

    password, setpass := rethinkurl.User.Password()

    if !setpass {
        log.Fatal("Password needs to be set in $COMPOSE_RETHINKDB_URL")
    }

(Just a reminder again, most of the error checks have been removed from these examples, check the online source code to see it with error checking in place).

We have to parse the URL so we can create the ConnectOpts needed to Connect() to the Rethink database. Here, we do an extra check to make sure the password is set in the URL and exit if it isn't.

We're ready to connect now:

session, err = rethink.Connect(rethink.ConnectOpts{  
        Address:  rethinkurl.Host,
        Username: rethinkurl.User.Username(),
        Password: password,
        TLSConfig: &tls.Config{
            RootCAs: roots,
        },
    })

There seems to be a lot going on here, but it's mostly setting variables for the Connect() call. The Address in ConnectOpts gets set to the host (hostname and port) from the URL and the Username comes straight from the parse URL's User section. We extracted the password earlier. The last setting is TLSConfig. For that, we create a tls.Config struct with RootCAs set to the certificate pool roots from the start. And that's it; enough for the driver to create a connection.

There is some housekeeping to do first before moving on - Creating the database and the table for our examples to work in.

  rethink.DBCreate("examples").Exec(session)
  rethink.DB("examples").TableCreate("words", rethink.TableCreateOpts{Replicas: 3}).Exec(session)

Here, we see the typical structure of a GoRethink request with a chain of commands ending with an Exec (or a Run for a query) and a session to perform that request on. Here, the first request is to create a DB called "examples". The second call sets the context to the "examples" DB, then asks to create a table called "words". By default, RethinkDB only creates the table on one node of the cluster so we add the option rethink.TableCreateOpts{Replicas: 3} to replicate it across all three nodes. Read more about that issue in Loss Prevention with RethinkDB.

We are now ready to read and write to RethinkDB.

The Read: Like other databases' drivers, GoRethink can make use of Go's struct tags for mapping data in and out of structs. That's why, in this case, our item struct type is defined like so:

type item struct {  
    ID         string `gorethink:"id,omitempty" json:"_id,omitempty"`
    Word       string `gorethink:"word" json:"word"`
    Definition string `gorethink:"definition" json:"definition"`
}

This maps the database id field to the ID field, word to Word and so on. That makes our reading remarkably simple.

        results, err := rethink.DB("examples").Table("words").Run(session)

        var items []*item
        err = results.All(&items)

First up, we perform the query. The DB() function sets the database context, the Table() function selects our table and, as we want everything, we just Run() the query against our session. If we wanted, we could add more functions to the chain to specify a more selective query.

The Run() returns results which we need to decant into item types. We create an empty array of items and then call on .All() on the results giving the items array as a parameter. And that's the data extracted. We can now go marshal it and return it to the requesting browser.

The Write: The write is equally easy to perform. We start with creating a new item from the Form request as we've seen in other examples.

        newitem := item{Word: r.Form.Get("word"), Definition: r.Form.Get("definition")}

        err := rethink.DB("examples").Table("words").Insert(newitem).Exec(session)

And then we chain together a DB selector function, a Table selector function, an Insert function with the new item as a parameter and an Exec function to run that. And we're done inserting the data. The Go RethinkDB example is complete.

RabbitMQ

The Code: The RabbitMQ example diverges from "The Example" code, mainly because although message queues have many things in common with databases, retaining and processing data, they retain it and process it in a different way. So, in the RabbitMQ version of the example, you push a message to a RabbitMQ exchange and get a message from a queue where those messages have been delivered. You'll find the code for this example in example-rabbitmq in the golang Grand Tour examples.

The Driver: RabbitMQ is an AMQP 0.9.1 compatible messaging platform, so any AMQP 0.9.1 compatible library or application should be able to talk to it. For this example, we use RabbitMQ's preferred streadway/amqp package. We'll import that first.

import (  
    "github.com/streadway/amqp"
)

The Connection: Before we start, we're going to have a RabbitMQ exchange called "grandtour" and we're going to use a routing key "words".

var routingKey = "words"  
var exchangeName = "grandtour"  

Now, there's a number of things to keep track of with RabbitMQ. We have to create an amqp.Connection which puts us in touch with the server. Then we make an amqp.Channel from that connection; it's like a session on database drivers. We're also creating a queue. So let's define some variables to retain those while the application runs.

var connection *amqp.Connection  
var channel *amqp.Channel  
var queue amqp.Queue  

Skipping forward to the main() function, we can start populating those variables.

    var err error

    connection, err = amqp.Dial(os.Getenv("COMPOSE_RABBITMQ_URL"))
    defer connection.Close()

    channel, err = connection.Channel()

The amqp.Dial() function provides us with the connection. There's a number of variants of it, but the basic Dial function handles amqps: TLS/SSL connections and Compose RabbitMQ uses Lets Encrypt certificates so verification is automated. Once we have the connection, we defer its clean up and then simply get a Channel from the connection. We're ready to work, and the first thing we need to do is set things up.

    err = channel.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)

First, we declare an exchange where we'll be sending our new messages. The third parameter sets this as a durable exchange so it'll be around even if we're not. If it's already around there's no error; durable exchanges are idempotent. Now we need to create ourselves a queue:

  queue, err = channel.QueueDeclare("", false, false, false, false, nil)

This creates us a transient queue which will go away when we stop using it. It has no name when we create it but it will get allocated a name automatically. That name will be in queue.Name which we get to use in the next line.

    err = channel.QueueBind(queue.Name, routingKey, exchangeName, false, nil)

So we want messages from our exchange, when they have our routing key, to be sent to our queue where we will pick them up. We do that using QueueBind on the channel and now we're ready to go.

The Write: We'll look at the write operation first, pushing something into the queue. That something is the combination of the text entered onto the form with the time.

        msg := r.Form.Get("message") + " " + time.Now().Format("15:04:05.00")

Now we can Publish this onto the channel, with an exchange name and routing key to publish into. The message itself is wrapped in an amqp.Publishing struct which handles the many possible other settings. For our purposes, we just need to set a type of text/plain and copy our message over.

        err := channel.Publish(exchangeName,
            routingKey,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msg),
            })

And our message is sent. It will land in the exchange, the exchange will see the routing key matches a binding to a queue and send it on to the queue where it, and any subsequent messages will gather up.

The Read: Now to read from that queue. We're going to use the Get() function which takes one item off the queue and acknowledges it.

        msg, ok, err := channel.Get(queue.Name, true)

This is not the ideal way to consume a queue in Go. For that, we recommend Consume which delivers queued messages to a Go channel. That's a lot more code than our simple example needs. Get will get our data from the queue we created; that's why the first parameter is queue.Name - the second parameter just sets automatic acknowledgment. What we get back are the message, an ok flag, and an error. The ok flag will be false if there was nothing waiting or an error so we can use that to decide what to return...

    if ok {
        w.Write(msg.Body)
    } else {
        w.Write([]byte("No message waiting"))
    }

So now we're creating and getting RabbitMQ messages.

AMQP messaging and RabbitMQ together are a potent combination for queuing and distributing work and the Go library unlocks that power.

Another Brief Stop

That's where Redis, RethinkDB, and RabbitMQ Go stretch of our journey ends. For the last stops in the Go stretch of the Grand Tour we'll visit etcd, MySQL and Scylla.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

Dj Walker-Morgan
Dj Walker-Morgan is Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.