Tuning it up to 11 - Transporter and the Cloudant Adaptor

Published

In the final installment of this epic 3-episode mini-series we revisit our simple Cloudant adaptor and make it a bit smarter. As we concluded in the previous installment, there are a few obvious challenges to overcome, and the aim here is to pick those off.

Our New Destination

We identified a few gaping holes in our previous effort. To recap, the simple adaptor we wrote cannot follow an evolving source, and it writes documents to the sink one at a time. Transporter has two concepts we didn't touch which address these:

  1. Tailing source
  2. Bulking sink

"Tailing" is Transporter-speak for following an evolving data source over time. "Bulking" just means that we buffer messages and send them as batches to the underlying database, which for most types of databases is an efficient way of loading larger data volumes.

We also had a more subtle issue to do with how Cloudant generates its revision ids. If we're pushing data to a Cloudant sink from a Cloudant source, we need the sink to respect the revision ids as defined by the source, which isn't the "normal" behavior. This quickly becomes a rabbit hole, and to do this "properly" goes way beyond the scope of this article. To constrain ourselves we'll state now that we'll only transport "winning revisions" which, to you and me, means we'll be ignoring conflicts. We do so with the justification that transporter isn't intended to be a full replicator. If you need replication between Cloudant databases, you have it already, built in.

sidenote-right A good primer on the Cloudant document tree structure, the concept of "winning revisions" and more can be found in the dx13 blog post The tree behind Cloudant's documents and how to use it.

The code here will start from the state we left it at the end of the previous installment. We'll build an additional sink, called bulk.go and an additional source, tailer.go, and make small tweaks in the client and adaptor to configure and select between the different options.

Tailing a Cloudant database

The name probably comes from the tail command which is available on Unix-like systems where you can say

tail -f logfile.log  

and watch live changes to the file over time. Databases that have a "loglike" or append-only structure are usually easy to tail, whereas the concept can sometimes be harder to fit onto more structured types of datastores. Fortunately for us, Cloudant is easy to tail because the changes feed can operate in continuous mode: what Transporter calls tailing Cloudant calls continuous changes, which arguably is more descriptive.

Looking at the Cloudant HTTP API, a single parameter addition gives us a continuous changes feed:

curl 'https://account.cloudant.com/database/_changes?include_docs=true' # one-shot  
curl 'https://account.cloudant.com/database/_changes?include_docs=true&feed=continuous' # tailing  

If we request the changes feed in continuous mode, we get line-delimited JSON back (good for streaming) and the connection remains open even if no further change events are forthcoming. A "heartbeat" in the form of an empty line is sent every so often to ensure that the line is there.

In terms of the go-cloudant library, instead of calling the Changes() function directly, we'll use its Follower, a construct designed to make tailing a Cloudant database as convenient as possible. We'll make our tailing source a separate client.Reader implementation so that we can offer the user a choice whether to tail or one-shot the database.

Here's the meat of tailer.go:

// Tailer implements client.Reader
type Tailer struct {  
     seqInterval int
}

func newTailer(seqInterval int) *Tailer {  
     return &Tailer{seqInterval}
}

// Read tails a continuous changes feed
func (t *Tailer) Read(_ map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc {  
     return func(s client.Session, done chan struct{}) (chan client.MessageSet, error) {
          out := make(chan client.MessageSet)
          session := s.(*Session)
          follower := cdt.NewFollower(session.database, t.seqInterval)

          go func() {
               defer close(out)
               defer follower.Close()

               changes, err := follower.Follow()
               if err != nil {
                    return
               }
               for {
                    select {
                    case <-done:
                         return
                    case event := <-changes:
                         var msg message.Msg
                         switch event.EventType {
                         case cdt.ChangesHeartbeat:
                              continue
                         case cdt.ChangesError:
                              continue
                         case cdt.ChangesTerminated: 
                              // Remote hung up. Restart from last known point
                              changes, err = follower.Follow()
                              if err != nil {
                                   return
                              }
                              continue
                         case cdt.ChangesInsert:
                              msg = message.From(ops.Insert, session.dbName, event.Doc)
                         case cdt.ChangesDelete:
                              msg = message.From(ops.Delete, session.dbName, event.Doc)
                         default:
                              msg = message.From(ops.Update, session.dbName, event.Doc)
                         }
                         out <- client.MessageSet{
                              Msg:  msg,
                              Mode: commitlog.Sync,
                         }
                    }
               }
          }()

          return out, nil
     }
}

Ok, that looks familiar enough from the previous version. We get a lot of stuff for free from the go-cloudant library. It's uncanny how its authors seem to have predicted that someone wanted to use it in Transporter. The follower actually classifies the events into an insert, update or delete for us, which we had to do ourselves in the previous version. It also has a little bit of hidden state that remembers the last "sequence id" processed, which is a bookmark. If for some reason, the remote end should terminate, we can pick up from where we left off (although note that this state isn't persisted, only kept in memory).

A note on seqInterval and optimizations

The parameter seqInterval is a sneaky optimization that can be helpful. Cloudant tags events in its changes feed with a sequence id (as we just touched upon earlier). In a cluster, which needs to fuse changes from multiple shards, generating these sequence ids is a costly operation. With seqInterval > 0 we can tell the source database to generate these not for every event.

This is a surprisingly beneficial optimization, but one which has diminishing returns. A value of 1000 isn't 10 times faster than one of 100. The optimal value depends on a lot of things, such as the shard and replica settings for your database. There is a downside, too. By having a gappy sequence, you also introduce a granularity > 1 in any restart. You can only restart from the most recently seen sequence id.

So let's wire this new source in. In the adapter entry point cloudant.go we choose the tailing version if the Tail field is set:

func (c *cloudant) Reader() (client.Reader, error) {  
    if c.Tail {
        return newTailer(c.SeqInterval), nil
    }
    return newReader(), nil
}

and we'll need the mechanics to deal with the new config parameters, too. The cloudant struct now looks like this:

// Cloudant is an adaptor that reads and writes records to Cloudant databases
type cloudant struct {  
    adaptor.BaseConfig
    Database     string        `json:"database"`
    Username     string        `json:"username"`
    Password     string        `json:"password"`
    BatchSize    int           `json:"batchsize"`
    BatchTimeout time.Duration `json:"batchtimeout"`
    SeqInterval  int           `json:"seqinterval"`
    NewEdits     bool          `json:"newedits,omitempty"`
    Tail         bool          `json:"tail"`
    cl           *Client
}

Out of the new ones, SeqInterval and Tail are for the source aspects of the adaptor. Nothing of consequence changes in the client.go, adding only the setter functions for the new parameters.

The bulking sink

The bulking sink is modeled on the bulking sink for MongoDB. It accepts messages into a buffer, and when the buffer length hits a threshold, or if a certain time interval has passed since the last upload, the buffer contents are packaged up and sent as a single API call to the database.

We chose to follow the structure of the MongoDB bulker closely, meaning that it should look familiar to the Transporter community.

The code is found in bulk.go. Let's break it down, starting with our client.Writer interface obligation:

func (b *Bulker) Write(msg message.Msg) func(client.Session) (message.Msg, error) {  
    return func(s client.Session) (message.Msg, error) {
        b.Lock()
        b.confirmChan = msg.Confirms()
        db := s.(*Session).database
        doc, err := isBulkable(msg)

        if err == nil {
            b.buffer = append(b.buffer, doc)
            if len(b.buffer) >= b.batchSize {
                err = b.flush(db)
                if err == nil && b.confirmChan != nil {
                    b.confirmChan <- struct{}{}
                }
            }
        }
        b.Unlock()
        return msg, err
    }
}

Even without seeing the definition of the Bulker struct, at a high level, that should be fairly clear:

  1. Add the incoming message to the buffer
  2. If the buffer is full, flush it
  3. Send on the confirm channel, if given

We wrap a mutex around this to ensure that we don't add a new message to the buffer between it being flushed and reset. Let's look at the Bulker struct:

// Bulker implements client.Writer
type Bulker struct {  
    buffer []interface{}
    *sync.RWMutex
    confirmChan  chan struct{}
    batchSize    int
    batchTimeout time.Duration
    newEdits     bool
}

Bulker extends sync.RWMutex so that we can call b.Lock() and b.Unlock(). Here's the constructor:

func newBulker(done chan struct{}, wg *sync.WaitGroup, db *cdt.Database, size int, dur time.Duration, newEdits bool) *Bulker {  
    b := &Bulker{
        buffer:       []interface{}{},
        RWMutex:      &sync.RWMutex{},
        batchSize:    size,
        batchTimeout: dur,
        newEdits:     newEdits,
    }

    wg.Add(1)
    if dur <= 0 {
        b.batchTimeout = 2
    }
    go b.run(done, wg, db)

    return b
}

When we create a new Bulker via newBulker() we start a little maintenance goroutine which handles the timer and the done channel that's used for normal termination:

func (b *Bulker) run(done chan struct{}, wg *sync.WaitGroup, database *cdt.Database) {  
    defer wg.Done()
    for {
        select {
        case <-time.After(b.batchTimeout * time.Second):
            log.Debugln("draining upload buffer on time interval")
            if err := b.drain(database); err != nil {
                log.Errorf("time interval drain error, %s", err)
                return
            }

        case <-done:
            log.Debugln("draining upload buffer: received on done channel")
            if err := b.drain(database); err != nil {
                log.Errorf("done channel drain error, %s", err)
            }
            return
        }
    }
}

The drain() method just locks, flushes and confirms:

func (b *Bulker) drain(database *cdt.Database) error {  
    b.Lock()
    err := b.flush(database)
    if err == nil {
        if b.confirmChan != nil {
            b.confirmChan <- struct{}{}
        }
    }
    b.Unlock()
    return err
}

and all of the interaction with the Cloudant database is contained in the flush() method:

func (b *Bulker) flush(database *cdt.Database) error {  
    if len(b.buffer) == 0 {
        return nil
    }
    result, err := cdt.UploadBulkDocs(&cdt.BulkDocsRequest{Docs: b.buffer, NewEdits: b.newEdits}, database)
    defer func() {
        result.Close()
        b.buffer = []interface{}{} // Recycle the buffer, as this batch is done
    }()

    if err != nil {
        return err
    }

    // Check that we get a 2XX response, but Transporter need not care about
    // the actual response body
    response := result.Response()
    if response.StatusCode != 201 && response.StatusCode != 202 {
        return fmt.Errorf("unexpected HTTP return code, %d", response.StatusCode)
    }

    return nil
}

Finally, the isBulkable() helper function checks validity and converts a Transporter message to something that's suitable for a Cloudant bulk upload:

func isBulkable(msg message.Msg) (data.Data, error) {  
    doc := msg.Data()

    ID, hasID := doc.Has("_id")
    rev, hasRev := doc.Has("_rev")

    op := msg.OP()
    if op == ops.Delete || op == ops.Update {
        if !hasID || !hasRev {
            return doc, fmt.Errorf("Document needs both _id and _rev")
        }
    }

    if op == ops.Delete {
        newDoc := data.Data{}
        newDoc.Set("_id", ID)
        newDoc.Set("_rev", rev)
        newDoc.Set("_deleted", true)
        return newDoc, nil
    }

    return doc, nil
}

Cloudant can perform any insert, update and delete operation in bulk, using a JSON payload of the type

{
    "data": [
        {
            "name": "Marcus Cicero"
        },
        {
            "_id": "11a837b8cd1bbdbcecd3b9ad3d411c53",
            "_rev": "1-bff9b171b6a50abbc07d7d77afe423d9",
            "name": "Atticus Fink"
        },
        {
            "_id":"11a837b8cd1bbdbcecd3b9ad3d4144d1",
            "_rev":"1-587feef0fbd17a5b964f613765cf9b9c",
            "_deleted": true
        }
    ]
}

showing an insert (no _id), an update and a delete respectively. The isBulkable() function checks that updates and deletes have both _id and _rev and for deletes it creates a new doc containing only _id, _rev and the magic _deleted field. Given the buffer of such docs, we can turn those into a BulkDocsRequest{} struct from the go-cloudant library in the flush() method we saw earlier.

Down the rabbit hole

The newEdits parameter warrants further explanation. The default for the Cloudant _bulk_docs endpoint is to operate in "new edits" mode. This means that the assumption is that the database is to be responsible for generating and maintaining revision ids. For example, if you try to inject a document looking like this:

{
     "_id": "11a837b8cd1bbdbcecd3b9ad3d411c53",
     "_rev": "3-bff9b171b6a50abbc07d7d77afe423d9",
     "name": "Bob Hope"
}

then in "new edits" mode, this is taken to mean an update of the document with the given _id and _rev, which if it exists would generate a new revision with a _rev starting with 4-. If it doesn't exist, or if the given _rev isn't a winning leaf node, we get our update rejected as an update conflict.

However, if we're replicating, this isn't what we want. That document is already a leaf node on the source side and does not represent a document to be updated on the destination side of the replication. We need a way to tell the destination side to accept revs as they are, and not refer to existing revisions. For this we need a "just do as I say, dammit" mode. A --force mode, basically.

This is what the newEdits parameter does. Setting this to false amounts to doing git push --force -- the source forces its revisions onto the destination, even where this generates update conflicts. Recall that rabbit hole we mentioned right at the beginning? This is it. We want to avoid the temptation to implement the full CouchDB replication protocol, as getting that right is challenging and requires some back and forth chatter between source and destination that isn't suitable for the abstractions that Transporter presents.

We didn't dwell on it earlier, but our sink already paves the way here. It only provides winning revisions and ignores conflicts. If we assume that our sink only receives data via Transporter, we're in a better (but not perfect) place simply by setting newEdits to false if we're connecting a Cloudant source to a Cloudant sink.

sidenote-right Check out Glynn Bird's couchreplicator for easy ways to run Cloudant replications from the command line.

Beware: if what you actually want is replication, use Cloudant's built-in replicator. It's awesome.

Plumbing in the Sink

We plumb in the sink along the same lines as we did for the new source. In cloudant.go:

func (c *cloudant) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) {  
    if c.BatchSize > 0 {
        return newBulker(done, wg, c.cl.database, c.BatchSize, c.BatchTimeout, c.NewEdits), nil
    }
    return newWriter(), nil
}

We treat a positive BatchSize as the signal that we want the bulking, rather than the simple version of the sink.

Testing

Testing the new bulk sink soon gets verbose. We won't examine it all, but here's a cherry pick from bulk_test.go. The idea is to insert a set of test data, and then read all of it back on the back channel to get at all the generated _id and _rev pairs. Then we loop through that and update half and delete the other half. We then verify that we have the expected number of remaining documents and that all of those have had the expected increase in their revision prefix. Phew!

Step one is to pump a set of test documents into the empty database:

    for i := 0; i < testDocCount; i++ {
        wr.Write(message.From(ops.Insert, "bulk", map[string]interface{}{
            "foo": i,
            "i":   i,
        }))(s)
    }

    // Wait a bit to let the buffer drain, but keep sink running.
    time.Sleep(1100 * time.Millisecond)

In order to do updates and deletes, we need to know existing _id and _rev pairs. Using the back channel, we request all the docs the database now contains, and loop through those, constructing alternate update and delete requests:

    docs := AllDocs(db)
    expectedDocCount := 0
    for index, item := range docs {
        if index%2 == 0 {      // Update on evens
            expectedDocCount++
            wr.Write(message.From(ops.Update, "bulk", map[string]interface{}{
                "_id":  item.ID,
                "_rev": item.Rev,
                "foo":  index,
                "i":    index + 2,
            }))(s)
        } else {               // Delete on odds
            wr.Write(message.From(ops.Delete, "bulk", map[string]interface{}{
                "_id":  item.ID,
                "_rev": item.Rev,
            }))(s)
        }
    }

    // Signal that we're done, and wait for everything to finish
    close(done)
    wg.Wait()

Finally, we grab the full contents again using the back channel and verify total document count, and that all revs now start with "2-", signifying that they have been updated once.

    docs = AllDocs(db)
    for _, row := range docs {
        if !strings.HasPrefix(row.Rev, "2-") {
            t.Errorf("found unexpected rev, %s", row.Rev)
        }
    }

    if len(docs) != expectedDocCount {
        t.Errorf("found unexpected doc count, %d", len(docs))
    }

There are more tests in the bulk_test.go file to examine, and also tests for the tailing sink in tailer_test.go.

Running the new Adaptor

Right, let's put our work to the test. We can show the workings of the tailing source by using a pipeline like this:

var source = cloudant({  
    "uri": "cloudant://127.0.0.1:5984",
    "username": "admin",
    "password": "xyzzy",
    "database": "testdb",
    "seqinterval": 10,
    "tail": true,
})

var sink = file({  
  "uri": "file://testfile.json"
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

That tails a local CouchDB and outputs to a file. What we expect should happen is that the initial transport should be more or less instant, and then Transporter should stay running, monitoring the CouchDB changes feed. If we also do a tail -f testfile.json we should be able to monitor the sink. If everything works right, any new data we inject into the CouchDB database should hit the changes feed, be picked up by the tailing source, and sent to the sink, which we should be able to see as it happens.

Here's a screen capture of this test:

For a meatier test, let's run a Cloudant-Cloudant pipeline with a large amount of data. As our example data set we have the Enron email archive. We used the following tool to load the enron data from disk into our local CouchDB: https://github.com/xpqz/enron and the dataset itself is available from CMU, which yields just north of 517k documents.

We turn on all bells & whistles for this test, with the exception that the seqInterval trick isn't available on our local single-node CouchDB 1.6.

var source = cloudant({  
    "uri": "cloudant://127.0.0.1:5984", 
    "username": "admin",
    "password": "xyzzy",
    "database": "enron",
    "tail": true,
})

var sink = cloudant({  
    "uri": "cloudant://127.0.0.1:5984",
    "username": "admin",
    "password": "xyzzy",
    "database": "enronsink",
    "batchsize": 1000,
    "batchtimeout": 2,
    "newedits": false,
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

In order to really exercise all parts we're going to start with an empty source database, and once transporter is running, start the ingest tool so that we're actually tailing a moving source.

The plan is:

  1. Using the pipeline above, start Transporter. Initially, both source and sink are empty.
  2. In a separate window, tail the changes feed continuously on the sink, using

    curl 'http://admin:xyzzy@127.0.0.1:5984/enronsink/_changes?feed=continuous'

  3. Start the ingest tool enron

Watching videos of other people running ETL processes is likely a pretty narrow interest niche, so here's a "greatest hits" edit to show that it works:

The whole thing took around 5 mins or so to complete.

Eagle-eyed readers will have spotted that the ingest tool claimed to have processed 517401 emails, whereas Transporter only saw 517368. Where did the missing 33 emails go?

This is to do with a max buffer size of 1M in the ingest tool, and there are 33 emails that are bigger than that in the archive, meaning that they are dropped. Technically, the 1M restriction isn't present in a local CouchDB, so we could have increased that if we wanted to.

The ingest tool uses all concurrency tricks available to it to upload the data quickly, so it's finishing fractionally ahead of the Transporter pipeline's finishing time.

Arriving At The Destination

So what did we achieve? We extended our simple Cloudant adaptor with a tailing source and a bulking sink. We also implemented support for running the sink in "replicator mode" (newEdits=false), meaning that a straight Cloudant-Cloudant pipeline will retain its revisions on the sink side appropriately unless the sink is also modified concurrently by other processes.

This version of the Cloudant adaptor has addressed nearly all of the issues we identified at the end of the previous installment. The remaining one is that we didn't get around to implement support for Transporter's commit log functionality, but that might warrant a separate write-up.

Even if you have no particular interest in the Cloudant adaptor itself, hopefully, this article series has given you enough insight to write one for your own favorite database. Let us know how you get on!

The full source for the Cloudant adaptor is available on github.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

attribution Pietro De Grandi

Stefan Kruger
Stefan Kruger Climber, skier, diver, geek. Works for IBM Cloudant to fund my extravagant life style. Love this article? Head over to Stefan Kruger’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.