Building your own data transport with Compose Transporter

Published

Compose Transporter isn’t just for moving and transforming data between the database engines that Compose supports. It’s open source with an open architecture that lets you create your own adapters to bring stodge-free ETL to your favorite database.

"ETL just not stodgy" is the description on Compose Transporter's GitHub repository and it sums up the tool quite effectively. Transporter comes with support for moving data between all the database engines that Compose supports. It's not just for Compose databases though; being fully open source, it's a useful tool for solving data mobility problems outside the confines of Compose's ecosystem, too.

It was carefully designed with this in mind. Compose is in the database business, and they need Transporter to be easy to extend whenever the next database technology comes along. Transporter has a set of pluggable extensions, called adaptors, which means that "all" you need to do to support the data movement to and from a new database to (and from) all the other ones already supported is to write an adaptor.

The process of writing an adaptor is largely undocumented. This is understandable, as this started life as an internal Compose tool, and that the spread of available adaptors is already pretty impressive and likely sufficient for most use cases.

The Go language source for all the adaptors is there to inspect though in the Transporter GitHub repository. Whilst the adapters are actually quite different what databases they interact with, they also have a lot in common so it's a good way to learn.

This is a three-part article. In the first part, we'll explore how an existing adaptor is put together. In the second, we'll walk through the construction of a new, non-trivial adaptor which moves data from a Cloudant database. Finally, in the third part, we'll add in the optional advanced Transporter features to the example adaptor such as allowing it to keep your data synchronized. By the end, you should have enough pointers to be able to go ahead and construct and share adaptors of your own.

Transporter Fundamentals

Transporter moves data from A to B. You interact with Transporter via the aptly named transporter command, which runs a set of JavaScript files that the user constructs. "Why JavaScript?" you might ask. Well, part of the thinking behind Transporter is to provide the ability to hook up a set of data rewrites between A and B, and with the message format conveniently representable as JSON, it was an obvious choice.

To create a Transporter pipeline, run the command

transporter init <source-adaptor> <sink-adaptor>  

where the source-adaptor is the name of the adaptor capable of reading records from your A, and sink-adaptor is the name of the adaptor capable of writing records to your B datastores. If, for example, you want to read from and write to a local file, we can use the file adaptor, as such:

$ transporter init file file
Writing pipeline.js...  

If we examine the created pipeline.js we should see something looking like this:

var source = file({  
  "uri": "stdout://"
})

var sink = file({  
  "uri": "stdout://"
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

The default state isn't all that helpful, as the source isn't going to be very successful reading from stdout. Let's change that to read from a specific file, but keep the sink writing to stdout:

var source = file({  
  "uri": "file://testfile.json"
})

var sink = file({  
  "uri": "stdout://"
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

and create a file testfile.json looking like this:

{"id": "8942374", "first_name": "Adam", "surname": "Brown"}
{"id": "9823473", "first_name": "Bella", "surname": "Davidson"}
{"id": "4982374", "first_name": "Callie", "surname": "Bartholomew"}
{"id": "8934740", "first_name": "David", "surname": "Solomon"}
{"id": "2873668", "first_name": "Erica", "surname": "Smith"}
{"id": "1524456", "first_name": "Farid", "surname": "Badawi"}
{"id": "9842878", "first_name": "Gabby", "surname": "Grant"}
{"id": "8726344", "first_name": "Hans", "surname": "Ivorssen"}
{"id": "6125431", "first_name": "Ioannes", "surname": "Hatzimichalis"}
{"id": "1423588", "first_name": "Jacob", "surname": "Brown"}

No, that isn't technically valid JSON, it's a file that contains a complete JSON-object per line (sometimes referred to as Line Delimited JSON). This is what the file adaptor expects.

We can now execute our pipeline:

$ transporter run pipeline.js 
INFO[0000] boot map[source:file sink:file]               ts=1517224466454460000  
INFO[0000] adaptor Listening...                          name=sink path="source/sink" type=file  
... logging output truncated ...
{"first_name":"Adam","id":"8942374","surname":"Brown"}
{"first_name":"Bella","id":"9823473","surname":"Davidson"}
{"first_name":"Callie","id":"4982374","surname":"Bartholomew"}
{"first_name":"David","id":"8934740","surname":"Solomon"}
{"first_name":"Erica","id":"2873668","surname":"Smith"}
{"first_name":"Farid","id":"1524456","surname":"Badawi"}
{"first_name":"Gabby","id":"9842878","surname":"Grant"}
{"first_name":"Hans","id":"8726344","surname":"Ivorssen"}
{"first_name":"Ioannes","id":"6125431","surname":"Hatzimichalis"}
{"first_name":"Jacob","id":"1423588","surname":"Brown"}
INFO[0001] adaptor Stopped                               name=source path=source type=file  
.... more logging output truncated ...

This amounts to a fairly complicated way of doing cat testfile.json. We can now add in-flight data transformations. This is a very powerful feature, but slightly out of scope for this article, so we'll only cover it briefly. For a fuller write-up, see the Compose docs on transformers.

Let's write a JavaScript transformation to add a field full_name that combines the fields first_name and surname. Here's the file fullname.js:

function transform(msg) {  
    var doc = msg['data'];
    if (doc['first_name'] && doc['surname']) {
        doc['full_name'] = doc['first_name'] + " " + doc['surname']
    }
    msg['data'] = doc;
    return msg;
}

and a tweak to the last line in pipeline.js so that it now looks like:

t.Source("source", source, "/.*/").Transform(js({"filename":"fullname.js"})).Save("sink", sink, "/.*/")  

Running it again, we get:

$ transporter run pipeline.js 
INFO[0000] boot map[source:file sink:file]               ts=1517225959787561000  
... logging output truncated ...
{"first_name":"Adam","full_name":"Adam Brown","id":"8942374","surname":"Brown"}
{"first_name":"Bella","full_name":"Bella Davidson","id":"9823473","surname":"Davidson"}
{"first_name":"Callie","full_name":"Callie Bartholomew","id":"4982374","surname":"Bartholomew"}
{"first_name":"David","full_name":"David Solomon","id":"8934740","surname":"Solomon"}
{"first_name":"Erica","full_name":"Erica Smith","id":"2873668","surname":"Smith"}
{"first_name":"Farid","full_name":"Farid Badawi","id":"1524456","surname":"Badawi"}
{"first_name":"Gabby","full_name":"Gabby Grant","id":"9842878","surname":"Grant"}
{"first_name":"Hans","full_name":"Hans Ivorssen","id":"8726344","surname":"Ivorssen"}
{"first_name":"Ioannes","full_name":"Ioannes Hatzimichalis","id":"6125431","surname":"Hatzimichalis"}
{"first_name":"Jacob","full_name":"Jacob Brown","id":"1423588","surname":"Brown"}
... more logging output truncated ...
INFO[0001] exit map[source:file sink:file]               ts=1517225960793274000  

This is a powerful idea. Note how the transformer function makes explicit assertions about the data. There is no inferred schema and no assumptions about either source or sink. The transformer sees a "record" in flight between source and sink, represented as a JavaScript object that it is free to manipulate.

The Pipeline

The principles underpinning Transporter are elegantly simple. A source adaptor is responsible for reading the next available data record from your database and transforming this into a Transporter Message. A sink adaptor does the reverse: it accepts a Transporter Message and transforms this into a data record that your database understands, and writes this to the target database. The Transporter system itself is a pipeline that uses the source as a producer, and the sink as the consumer of messages.

There is some configuration handling to do with how you define sources and sinks, and then there's the optional transform stage where you can plug in a sequence of message rewrites between source and sink. We don’t need to worry about how transformers work. They are just something in the pipeline that takes Messages in and emits Messages. From an adaptor’s point of view, they should Just Work ™.

An adaptor typically will want to function both as a source and sink, but this isn't required. For example, your use case may be to move data from some system of record to an analytics solution -- an inherently one-way process, and just writing the source may be sufficient for your needs. Or consider something like ElasticSearch -- that's inherently sink-only.

For the rest of this article, we'll examine both the source and the sink aspect of the file adaptor we used above.

Anatomy of an Adaptor

Adaptors live under the transporter/adaptor directory in the Transporter repository. An adaptor needs to implement the Adaptor interface, defined in adaptor.go:

// Adaptor defines the interface which provides functions to create client interfaces
type Adaptor interface {  
    Client() (client.Client, error)
    Reader() (client.Reader, error)
    Writer(chan struct{}, *sync.WaitGroup) (client.Writer, error)
}

Adaptor in turn consists of lower-level interfaces. For the source aspect, we need to implement the Reader interface. It's defined in client.go and the relevant bits look like this:

// MessageChanFunc represents the func signature needed to send messages to downstream adaptors.
type MessageChanFunc func(Session, chan struct{}) (chan MessageSet, error)

// MessageSet encapsulates the data being sent down the pipeline and its associated attributes.
type MessageSet struct {  
    Msg       message.Msg
    Timestamp int64
    Mode      commitlog.Mode
}

// NsFilterFunc represents the func signature needed to filter while Read()ing.
type NsFilterFunc func(string) bool

// Reader represents the ability to send messages down the pipe and is only needed for
// adaptors acting as a Source node.
type Reader interface {  
    Read(map[string]MessageSet, NsFilterFunc) MessageChanFunc
}

So in other words, we'll need a Read() function that sets us up with a function that when called feeds us back Messages on a channel. Let's look at how we'd use it with the simple File adaptor:

// Cheerfully ignoring all errors for clarity
a, _ := adaptor.GetAdaptor(  
    "file",
    map[string]interface{}{"uri": fmt.Sprintf("file://%s", "some path+name stuff here"},
)

c, _ := a.Client()   // The Client represents the underlying database  
s, _ := c.Connect()  // The Session is an active, authenticated connection  
r, _ := a.Reader()   // The Reader is jump-off point for the source functionality

// Create the read function. The first argument is a resume buffer; it's not used
// by the file adaptor. The second argument is a filter that should return
// true for every record we want to include. In our case, it's every record,
// so just return true.
readFunc := r.Read(map[string]client.MessageSet{}, func(string) bool { return true })

// The done channel is used to signal completion
done := make(chan struct{})  
defer close(done)

// Pass the session to the read function, and the done channel, and get back
// the messages in a channel
msgChan, _ := readFunc(s, done)

var count int  
for range msgChan { // Drain the channel  
    count++
}

That sure looks like a lot of setup steps. But remember that you will not end up using it like this, apart from in the unit tests for your adaptor (the above snippet found in reader_test.go). We saw earlier how you use adaptors via the pipeline.js JavaScript file.

Looking again at the File adaptor, we see it's split into the following files (excluding tests):

README.md  
client.go  
file.go  
reader.go  
session.go  
writer.go  

Most adaptors follow a similar pattern. The main implementation is kept in the two files reader.go and writer.go, but let's start at the top: client.go. The Client needs to do three main things:

  1. Represent the underlying "database", and accept any relevant set of parameters for connecting to it.
  2. Implement Connect() that should return a Session, being an authenticated, open connection to the database.
  3. Implement Close() that should terminate cleanly the connection to the database.

In terms of a local file on disk, there is not that much to do.

// Client represents a client to the underlying File source.
type Client struct {  
    uri string
    file *os.File
}

// Connect initializes the file for IO
func (c *Client) Connect() (client.Session, error) {  
    if c.file == nil {
        if err := c.initFile(); err != nil {
            return nil, err
        }
    }
    return &Session{c.file}, nil
}

// Close closes the underlying file
func (c *Client) Close() {  
    if c.file != nil && c.file != os.Stdout {
        c.file.Close()
    }
}

// Open the file. If it's not present, create it. Store handle
// in receiver.
func (c *Client) initFile() error {  
    if strings.HasPrefix(c.uri, "stdout://") {
        c.file = os.Stdout
        return nil
    }
    name := strings.Replace(c.uri, "file://", "", 1)
    f, err := os.OpenFile(name, os.O_RDWR, 0666)
    if os.IsNotExist(err) {
        f, err = os.Create(name)
        if err != nil {
            return err
        }
    }
    if err != nil {
        return err
    }
    c.file = f
    return nil
}

This is pretty straight forward. The Client is a struct that holds a file URI and a file handle. The Connect() function calls the customized initFile() to open the file. If asked to write to “stdout://“, the initFile() function sets the file to the Go systems Stdout. If not, it takes the filename from the URI and opens it for reading or writng, creating the file if it doesn’t exist. Both the source and sink need their file opened to function and the resulting open file handle is returned by Connect(), wrapped in a Session structure.

As far as Transporter is concerned, the adaptor entry point is file.go, which defines a struct that holds all configuration parameters as received by the user-defined JavaScript files that define a Transporter pipeline. It extends the adaptor.BaseConfig struct and hooks the adaptor into the mechanics of Transporter. The File struct has methods that implement the setup aspects we saw earlier:

// Give us a File adaptor
a, _ := adaptor.GetAdaptor(  
    "file",
    map[string]interface{}{"uri": fmt.Sprintf("file://%s", "some path+name stuff here"},
)

c, _ := a.Client()   // As per above  
r, _ := a.Reader()   // We'll get to this below...  
// Similarly for Writer()

We'll skip the details of this--nothing too onerous. Similarly, Session is basically an empty shell in this case. There is no authetication required in order to read a local file, so the Session just wraps the file handle in the client.

Source

The source functionality is contained in reader.go. Recall that the job of the source is to implement the Reader interface, through the Read(...) method. It looks like this:

func (r *Reader) Read(_ map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc {  
    return func(s client.Session, done chan struct{}) (chan client.MessageSet, error) {
        out := make(chan client.MessageSet)
        session := s.(*Session)
        ns := session.file.Name()

        // Start the reader goroutine.
        go func() {
            defer close(out)
            results := r.decodeFile(session, done) // Reads file line by line, decodes, and sends back on channel
            for {
                select {
                case <-done:
                    return
                case result, ok := <-results:
                    if !ok {
                        log.With("file", ns).Infoln("Read completed")
                        return
                    }
                    if filterFn(ns) { // Our filter func is a no-op - just returns true!
                        out <- client.MessageSet{ // Send back a message set with a single message
                            Msg: message.From(ops.Insert, ns, result),
                        }
                    }
                }
            }
        }()

        return out, nil
    }
}

It looks more complex than it is. Its job is to create a function which when called returns a channel upon which the lines of the file are returned, suitably decoded into Messages. As we saw earlier, the file is assumed to contain one JSON object per line. We can disregard the mechanics of decodeFile(). This adaptor doesn't care about the resumption queue, so the first parameter to Read() is ignored.

Sink

The sink implementation is found in writer.go, and it is in our case pretty simple. The Go JSON encoder can send data directly to a file handle:

func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error) {  
    return func(s client.Session) (message.Msg, error) {
        if err := dumpMessage(msg, s.(*Session).file); err != nil {
            return nil, err
        }
        if msg.Confirms() != nil {
            msg.Confirms() <- struct{}{}
        }
        return msg, nil
    }
}

func dumpMessage(msg message.Msg, f *os.File) error {  
    return json.NewEncoder(f).Encode(msg.Data())
}

Summary

The Compose Transporter is a powerful tool for moving data between databases that potentially have very different data models. It is designed from the ground up to make hooking it up to new databases easy, via its adaptor system. Although the process for creating new adaptors isn't formally documented, this article has illustrated how an adaptor is put together through the dissection of the simple File adaptor.

In the next installment, we'll walk through the construction of a new, a more complex non-trivial adaptor by hooking up IBM Cloudant to Transporter.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

attribution PIRO4D

Stefan Kruger
Stefan Kruger Climber, skier, diver, geek. Works for IBM Cloudant to fund my extravagant life style. Love this article? Head over to Stefan Kruger’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.