Go, RethinkDB & Changefeeds - Part 2

RethinkGo

In part one we built a simple Go application which created some real-time activity in a RethinkDB database. Now we can move on to start tracking what's going on inside the database using changefeeds.

Changefeeds and RethinkDB

The traditional method of finding out whether something has changed in a database is to regularly poll the database with a query. It works well enough, if you don't mind that the results are out of date almost as soon as you've got them and that the database is expending energy and time on that query regularly. Also, polling doesn't scale, so every client who wants to track changes has to poll and that increases the load.

That's where RethinkDB and its changefeeds come in. In version 1.16, the RethinkDB developers released the version of changefeeds they'd been working towards. Rather than polling with a query, a client can lodge a query with the database and ask to be pushed any changes to the results of that query; to have the changes fed to them.

The best part about how RethinkDB handles changefeeds is that they look identical to a normal query, apart from a Changes() in GoRethink method added to the query. So if you have a query that returns all of the database...

r.Table("Scores").Run(session)  

To make that return all the changes that happen on the table we add Changes():

r.Table("Scores").Changes().Run(session)  

We'll start with that...

Watching everything

The best way to understand changefeeds is to start using them. Get the scoreseed.go application from part one running against a RethinkDB instance. Then we'll want to create a simple skeleton app to track changes. You'll find these examples in their own github repository for you to download – this example is named followall.go. The start of the app will be familiar from part one:

package main

import (  
  "fmt"
  "log"

  r "gopkg.in/dancannon/gorethink.v0"
)

func main() {  
  fmt.Println("Following on RethinkDB")

  session, err := r.Connect(r.ConnectOpts{
    Address:  "127.0.0.2:28015",
    Database: "players",
  })

  if err != nil {
    log.Fatal(err)
  }

It just opens a connection to the RethinkDB database. Now we want to make our query for the change feed:

  res, err := r.Table("scores").Changes().Run(session)

This returns us a cursor equivalent of an everlasting gobstopper. If we keep reading from it, we'll get all the latest changes:

  var value interface{}

  if err != nil {
    log.Fatalln(err)
  }

  for res.Next(&value) {
    fmt.Println(value)
  }
}

We're not going to decode the data into structures, partly because we want to get a better view of what is being returned. In this case, if we run it, we get output like this...

map[old_val:map[id:272 PlayerName:Player 272 Score:42] new_val:map[PlayerName:Player 272 Score:44 id:272]]  
map[new_val:map[PlayerName:Player 453 Score:82 id:453] old_val:map[Score:79 id:453 PlayerName:Player 453]]  
map[new_val:map[PlayerName:Player 566 Score:64 id:566] old_val:map[PlayerName:Player 566 Score:65 id:566]]  

Each result from the change feed is two values, the oldval key pointing at a previous value, the newval one pointing at the latest version.

If the old_val is nil, then the change is that the record has been inserted:

map[new_val:map[Score:31 id:100 PlayerName:Player 100] old_val:<nil>]  

and if the new_val is nil, then the record has been deleted:

map[new_val:<nil> old_val:map[PlayerName:Player 100 Score:31 id:100]]  

Filtering the chain

Getting raw changes like this has been a feature of RethinkDB for a while now. What happened in the most recent version was the expansion of this capability and what you can put before the .Changes() method. Let's start with filters. If we wanted to get anyone with a score over 50 we could run a query like this (see followgt50.go):

res,err:=r.Table("scores").Filter(r.Row.Field("Score").Gt(50)).Run(session)  

Simply adding .Changes() to the end would give us a change feed for any changes to records where the score is over 50. Inserts and deletes with scores over 50 would show up in the feed as above. Also, when a record starts or stops matching that filter, you get a change with a "nil" oldval or newval as appropriate.

As well as Filter(), the Map() and Pluck() methods can also be included in the chain before .Changes() so that you can remap the documents going down the chain. If we were just interested in the player names for anyone with a score over 50, we could do this:

  res, err := r.Table("scores").Filter(r.Row.Field("Score").Gt(50)).Pluck("PlayerName").Changes().Run(session)

First Values

The previous changefeeds just send us a stream from as soon as we open them. There's another set of operations which will start up by returning the current value of the items your are tracking with the change feed and then switch to sending you the oldval and newval data from then on. First of these is the point change feed operator, ".Get()". The method, outside a change feed, retrieves a single document by its id. When used with ".Changes()" it starts tracking changes on that single record. If we change our change feed line to:

  res, err := r.Table("scores").Get("100").Changes().Run(session)

Remember our id field in this example is a string. Now if we run our code again, we'll see something like this (see followone.go):

Following One on RethinkDB  
map[new_val:map[PlayerName:Player 100 Score:30 id:100]]  
map[old_val:map[PlayerName:Player 100 Score:30 id:100] new_val:map[id:100 PlayerName:Player 100 Score:32]]  

Three other functions can be chained that return an initial value are .Min(),.Max() and .OrderBy().Limit(). But, there is a constraint on what you can pass as parameters to these methods when in a change feed. Normally, you can pass a field name, a function or an index to them, but in a change feed all you can pass to them is an index. Recall, in part 1 when we created our player simulator, we added an index to the Score column called Score. Now, this index comes into play.

Let's start with the Min method, and a slight difference between the official API and the GoRethink driver; where the API has min() taking a string, function or index parameter, the Go version has Min and MinIndex, the latter being used to pass an index to the function. So, to use our Score index. Knowing that we can now change our code (see followminmax.go) to track the lowest scoring player with:

  res, err := r.Table("scores").MinIndex("Score").Changes().Run(session)

We leave it as an exercise for the reader to follow the maximum score. The first thing this code will return is the current lowest scoring player, and it'll update us when that changes with the previous minimum record and the current minimum record - worth knowing if you want to know how the ranking changed.

.OrderBy().Limit() is powerful too but they come as a package in changefeeds; you can't have one without the other. We have to give OrderBy a field which is indexed and a sorting order, then limit the number of results we want. So if we want the top 5 scorers, in Go we'd use this:

    res, err := r.Table("scores").OrderBy(r.OrderByOpts{Index: r.Desc("Score")}).Limit(5).Changes().Run(session)

Like the ConnectOpts (discussed in part 1), r.OrderByOpts() is used to manage the parameters being passed to the OrderBy() function. In this case, we use it to pass the name of the indexed field and the order, descending, we want it sorted by. If we drop this line into our code (see followtopscores.go) we get output like this:

Following top scores on RethinkDB  
map[new_val:map[Score:115 id:952 PlayerName:Player 952]]  
map[new_val:map[PlayerName:Player 558 Score:117 id:558]]  
map[new_val:map[PlayerName:Player 692 Score:119 id:692]]  
map[new_val:map[id:800 PlayerName:Player 800 Score:119]]  
map[new_val:map[PlayerName:Player 362 Score:128 id:362]]  
map[new_val:map[Score:114 id:404 PlayerName:Player 404] old_val:map[PlayerName:Player 952 Score:115 id:952]]  
map[new_val:map[PlayerName:Player 692 Score:117 id:692] old_val:map[PlayerName:Player 692 Score:119 id:692]]  
map[new_val:map[PlayerName:Player 852 Score:114 id:852] old_val:map[PlayerName:Player 404 Score:114 id:404]]  

The first 5 values we get from the change feed cursor are the current top 5. There's no indication that there's only 5 - thats implicit from the value we gave to Limit(). The simplest way to check these are the initial returned results is to test for the absence of oldval; if there's an oldval then you are processing changes, not initial results. The changes are representative of records moving in and out or changing within the top 5.

So, the first change we see is Player 952 leaving the set having previously had a score of 115, and being replaced by Player 404 with a score of 114. You'd have to retrieve Player 952's record to find out their score had dropped to 113 as the reason, but if you are only interested in maintaining a top 5, all you need to do is replace the slot occupied by 952 with 404.

The next update may seem odd but it's for Player 692 and is a change which sees their score drop, but not enough to move them in the ranking. If Player 362, current top scorer, were to suddenly drop out of the top 5 with a low score, you'd also see a ripple of changes as the other players moved up the rank. Remember this, that you really will get a feed of changes, when you start using .OrderBy().Limit() and that the old_val in this case is going to be very useful to you.

Between the modes

There is one last change feed function we haven't covered, .Between(). It's purpose is to allow you to follow changes between a range of id's. Although you may expect it to return a first set of values, it doesn't - that's expected to come in a later release of RethinkDB. Instead, you'll immediately get the stream of changes for the records whose keys fall within the range. By default, it uses the primary key, so we can simply say (see followbetween.go):

  res, err := r.Table("scores").Between("100", "200").Changes().Run(session)

And we'll see changes. We will get caught out here though; we made the id field a string and asking for between with strings means we're going to be doing lexical comparisons... lexically, "2" and "19" are between "100" and "200" so we'll get curious results. If your id field is full of lexically meaningful strings though you'll be able to slice into them with this.

A more interesting use with our data simulation would be to get changes in data between or equal to a score of 100 and 150. For that we can use this (see followbetweenscores.go) to generate our change feed:

  res, err := r.Table("scores").Between(100, 150, 
    r.BetweenOpts{
      Index: "Score", 
      LeftBound: "closed", 
      RightBound: "closed"
    }).Changes().Run(session)

We give the left and right arguments as integers, and then using BetweenOpts, tell it to use the indexed Score field. The LeftBound and RightBound parameters specify whether the end bounds are inclusive (closed) or exclusive (open). Running this lets us track a range of scores and watch as players enter or leave the score zone, in real time, without us polling the database.

That's the kind of power you need for the modern real time web and that's what RethinkDB has made available not as an add-on but in the core of their database opening up numerous possibilities for new kinds of applications. It's one of the reasons we have RethinkDB as one of our available databases at Compose, which also means you can check it out at the click of a button in our dashboard.