Go, RethinkDB & Changefeeds - Part 1

RethinkGo

With the recently expanded capabilities of the RethinkDB to handle real-time changes, we thought it would be worth doing a deeper dive into using them. As I've been working with RethinkDB using Go, I was going to write the examples in Go, then realised a lot of people don't know you can use Go with RethinkDB.

Go isn't one of RethinkDB's officially supported languages - JavaScript, Python and Ruby currently are – but the GoRethink driver is a well-developed driver that is worth considering if you are looking to develop an application with RethinkDB.

It uses the Rethink API as a template and adapts it with Go idioms. This is quite a challenge because Go isn't a dynamic language like the officially supported languages, coming with type checking and analysis.

We'll start in this part looking at a simple application to generate changes in a RethinkDB database. We'll also introduce the stylings of the GoRethink driver. In the next part of this article, we'll use this example to look at how we can track the changes generated using RethinkDB's changefeeds.

Preparing for changes

For this task, we're going to have a simple record with an id, a player name and a score - We'll call this the ScoreEntry. We'll create a thousand of these ScoreEntry records for a thousand players and randomly increment or decrement the score of a randomly selected player.

package main

import (  
  ...
  r "gopkg.in/dancannon/gorethink.v0"
)

type ScoreEntry struct {  
  ID         string `gorethink:"id,omitempty"`
  PlayerName string
  Score      int
}

We import the gorethink package against the variable r. This is to keep in step with the RethinkDB API which uses 'r' as the root for database operations. When we come to connect to our database...

  session, err := r.Connect(r.ConnectOpts{
    Address:  "127.0.0.2:28015",
    Database: "players",
  })

  if err != nil {
    log.Fatal("Could not connect")
  }

This Go code serves as a good point to note the one big difference between the Go driver and the other drivers. Go has a requirement for libraries and exported data structures that their names begin with an upper case character. So the connect() API call from the JavaScript API becomes Connect() in Go.

That also applies to structures like the ScoreEntry above. The contained variables all start with an upper case character and it is those names that are used to encode the structure into JSON for writing into the database. RethinkDB requires an id field for records though so the ID field needs to be translated to id and that's done by annotating the declaration with gorethink:"id,omitempty". The omitempty part is a directive to the JSON encoder to skip the field if its not set.

Another thing worth noting is that while the official API uses key/value maps to pass options to call like this: r.connect({ host: "127.0.0.2", port:28015, db:"players"}).

The GoRethink driver tries to work with a more structured and static way of passing options. Note the r.ConnectOpts as the parameter to the call. That's actually a structure for the parameters documented here. The names and values passed between the curly braces following it are the Go way of initialising a structure so it looks like a loose map of settings but is actually statically checkable and well defined.

To make sure we have a fresh table to work with, we'll drop and recreate our "scores" table and pre-emptively create a secondary index on the Score field:

  err = r.Db("players").TableDrop("scores").Exec(session)
  err = r.Db("players").TableCreate("scores").Exec(session)

  if err != nil {
    log.Fatal("Could not create table")
  }

  err = r.Db("players").Table("scores").IndexCreate("Score").Exec(session)

  if err != nil {
    log.Fatal("Could not create index")
  }

Here's another difference of note between the official drivers and the GoRethink driver. The official drivers have a single call, run() to execute commands and, running on dynamic languages, this can return different numbers of and kinds of results as needed. Go is a more strict about this so the GoRethink driver has Run() for returning database results from queries, RunWrite() for returning the responses from write operations and, as we see above, Exec() is for calls that aren't returning any database results, just an error code if appropriate.

Creating new players

Now we can create our players, all 1000 of them.

  for i := 0; i < 1000; i++ {
    player := new(ScoreEntry)
    player.ID = strconv.Itoa(i)
    player.PlayerName = fmt.Sprintf("Player %d", i)
    player.Score = rand.Intn(100)

We loop around a thousand times, each time creating a player with the ScoreEntry struct. Then we fill in the ID field which is a string so we have to convert the integer i. We then create an unimaginative player name and generate a random starting score ranging from 0 to 99. Now we move onto the database writing...

  _, err := r.Table("scores").Insert(player).RunWrite(session)

  if err != nil {
    log.Fatal(err)
  }
}

Here, we are doing a database insert into the 'scores' table. We're inserting the player variable, the contents of which will be automatically encoded as JSON. We use the RunWrite() method to ask for any results to be saved as WriteResponses, but we are also going to discard those result. The _ in _,err:= says to throw away that particular return value. We then check no error was returned, exiting if one was, and continue till we have 1000 players. We can now start generating changes.

Making the changes

We now go into a loop forever. We are going to retrieve a ScoreEntry from the database first, so we'll need a variable to store that score in.

  for {
    var scoreentry ScoreEntry

Then we'll need a random id to select a record to change and a random increment or decrement.

    pl := rand.Intn(1000)
    sc := rand.Intn(6) - 2

And now we can go and retrieve the record from the database. We have to convert our integer id to a string and then we directly retrieve the id from the table using Get(). Here we finally get to use Run() to execute the query. The results are returned in the form of a cursor.

    res, err := r.Table("scores").Get(strconv.Itoa(pl)).Run(session)

    if err != nil {
      log.Fatal(err)
    }

As we know there's only one record behind the cursor, we can retrieve that using the One() method for cursors. If we were being careful, we could check there was a value present using the IsNil() cursor method. Where there's more than one record to be retrieved from the cursor, the Next() method lets you step through them. You can read about those and other methods in the documentation for Cursor.

    err = res.One(&scoreentry)

With our entry we can now adjust the score...

    scoreentry.Score = scoreentry.Score + sc

And write it back to RethinkDB using the Update() method.

    _, err = r.Table("scores").Update(scoreentry).RunWrite(session)

We'll then sleep for 100 milliseconds, to pace our changes and then loop back round.

    time.Sleep(100 * time.Millisecond)
  }
}

This isn't the most complex slice of code in the world, but it should help you move between the dynamic language stylings of the official RethinkDB drivers to the structured, typed stylings of Go and the GoRethink driver. I'm quite impressed by how well the Rethink API has been translated to the Go idiom without losing the fluidity. In the next part, we'll move on to using Rethink's changefeeds to watch these changes.

The complete code for this example can be found on the Compose Examples github repository.

Continue the adventure in part 2 of Go, RethinkDB, & Changefeeds.