Turning RethinkDB changes to RabbitMQ messages

Do you want to know how to turn RethinkDB changes into RabbitMQ messages? In this article we'll do just that. Following our recent PostgreSQL/Go/RabbitMQ explainer, we were asked if there were any other databases that made picking up changes easy. With PostgreSQL, we took trigger-detected table changes and used PostgreSQL's own LISTEN/NOTIFY system to send them out to a Go application to forward on into RabbitMQ.

This is where RethinkDB comes in as it is built for streaming changes in the database out to clients. It makes it ideal to show as a different approach. To demonstrate this, we're going to port over the postrabbit code to make rethinkrabbit.

We've previously dived into the mechanics of RethinkDB in Go, RethinkDB and Changefeeds but we will pull all that together into a practical example.

If you like spoilers, you'll find the code for this article in its own compose examples repository.

The Setup

In the original postrabbit much of the critical code was held in the setup code. Creating the table, creating a trigger that detected the changes and creating a function to turn those changes into NOTIFY messages - it was all done in the server... For RethinkDB, only the table creation is needed and not a lot of that. Let's walk through the entire setup code as it'll cover connecting to RethinkDB:

package main

import (  
    "crypto/tls"
    "log"

    r "github.com/dancannon/gorethink"
)

We're going to be using a TLS/SSL encrypted link without server certificate verification. We mention this because if you want to do this with verification you'll want to consult our introduction to RethinkDB and SSL. In the PostgreSQL code, we only needed to pass a single URL to the connection code. For RethinkDB it's a little more complex. There's the address (the host name and port number separated by a colon), the name of the database we want to access and the authkey to enable our connection into the database. We added these to our YAML config parsing so we can use them here – look in rethinkrabbit.go for the details:

func setup(config Config) {  
  conn, err := r.Connect(r.ConnectOpts{
    Address:  config.RethinkDBAddress,
    Database: config.RethinkDBDatabase,
    AuthKey:  config.RethinkDBAuthkey,
    TLSConfig: &tls.Config{
      InsecureSkipVerify: true,
    },
  })
  if err != nil {
    log.Fatal(err)
  }

The different bit here is that TLSConfig where we add the InsecureSkipVerify option so we don't do the certificate validation – we'll still have an encrypted connection though. After the connect call we have the obligatory Go error check. The rest of the code will simply drop and create a new table.

  r.DB(config.RethinkDBDatabase).TableDrop("urls").Run(conn)
  r.DB(config.RethinkDBDatabase).TableCreate("urls").Run(conn)
  conn.Close()
}

Running with the changes

We can now skip over to the rethinkrabbitrun.go file where all the real business takes place. The connection to the RethinkDB database is the same as above and the RabbitMQ publish code is essentially the same (we'll pull out the difference later on). That's when we get to this:

  res, err := r.Table("urls").Changes().Run(session)
  if err != nil {
    log.Fatalln(err)
  }

This gives you all the changes happening to the "urls" table as a blocking result set from the database. It probably typifies the difference between the PostgreSQL and RethinkDB. With PostgreSQL it's a combination of SQL standard triggers and server-side non-SQL-standard LISTEN/NOTIFY. With RethinkDB, it's the developers' ability to imagine a clean interface for clients, rather than servers, to process changes. If you look at our previous coverage of change feeds, you will find that this is the crudest form of feed; its possible to create change feeds from many RethinkDB ReQL queries. Let's get back to the actual code and move on to consume those changes. For the GoRethink driver, we need an interface to decant the change data:

  var value interface{}

Then we can go into a loop where we get a change, using the Next() method for reading the next value from a cursor and put the results into the value variable. Here we then do a bit of a Go-ism; we cast the value as if it were a map with strings as keys to some abstract data. Here's the code:

  for res.Next(&value) {
    mapval := value.(map[string]interface{})

Right, now we can evaluate the change data. Briefly, RethinkDB sends two values, old_val and new_val. If both are set, it's an update, if only the old_val is set it's a delete and if only the new_val is set, then it's an insert. It's the latter that we're interested in so we look in our map for new_val being set and old_val not being there:

    if mapval["new_val"] != nil && mapval["old_val"] == nil {

Having found our insert, we can now wrap it up as JSON. This is purely for Go users. Other language drivers will often be handing around JSON structures anyway so this step wouldn't apply, but we're in statically-typed Go, so this has to be done:

      jsonbytes, err := json.Marshal(mapval["new_val"])
      if err != nil {
        log.Fatal(err)
      }

The json.Marshal() method will return a byte array when successful so all we need to do is pop it into the channel to be picked up by the RabbitMQ publishing loop:

      rabbitchannel <- jsonbytes
    }
  }

We do have to make a change earlier in the code, where we create rabbitchannel. We switch from creating a channel of strings to a channel of byte arrays with rabbitchannel := make(chan []byte, 100).

Once you've built this, (go get -a then go build) and created your rrcreds.yaml file (see rrcreds.example.yaml) you'll be able to run this and repeat the testing exercises we carried out in the PostgreSQL article. We ported the add command too, of course.

Choices

We've seen two different approaches to handling distributing change data efficiently in this and the original article. Which one suits you depends on your use case.

For me, I'd lean towards PostgreSQL's mechanisms for server to server or application updates, but if I need to have lots of client applications all presenting different queries and looking for the changes in those queries, I'd be looking at RethinkDB.

They are both exceptional databases with a lot to offer; having both on-demand is an even better option. At Compose, we have these and many other databases ready for when we, or you, need them.