How to move data with Compose Transporter - From database to disk


Transporter is a great way to move and manipulate data between databases. In this new article, we look at how you can get on board the Transporter quickly.

Note: Since this article was published, some major changes have happened to Transporter. This article has been updated to reflect that.

With the latest version of Transporter, we've been making our open-sourced tool for moving and manipulating data between databases even easier. In this short series of articles, we are going to show you how to get your data moving with Transporter.

Getting Transporter

To begin with, you'll want your own copy of Transporter. You can find binary and source releases in Download the appropriate version for your system (the macOS version is -darwin) or, if you prefer to build your own, you can clone the Transporter Github repository. You'll probably want to rename the downloaded file transporter and give it execute permission (chmod u+x transporter) so it can run.

Transporter quickly

Transporter is based around the idea of building a pipeline. At one end is the source. This brings in data from databases or files and converts it into messages which the pipeline can process. Then the messages flow down the pipeline passing through filters. In Transporter terms, these filters are called transformers as they are more powerful than a simple filter and can modify the messages. The messages keep flowing downstream until they eventually reach the end of the pipeline and the sinks. Sinks take messages in and send them out to other databases.

Let's start with extracting the contents of a database to a file. This is a great way to get a handle on how Transporter and its pipeline is configured.

The new version of Transporter has one addition which makes things much easier. The transporter init command is now the quickest way to get started creating a pipeline between two data sources. Give transporter init the names of two adaptors and it will create the configuration files needed to have one as the source of the data and one as the destination, the sink for the data. But where do you find those names?

Information about adaptors in actually built into Transporter. Run transporter about to list the available adaptors.

$ transporter about
rabbitmq - an adaptor that handles publish/subscribe messaging with RabbitMQ  
rethinkdb - a rethinkdb adaptor that functions as both a source and a sink  
elasticsearch - an elasticsearch sink adaptor  
file - an adaptor that reads / writes files  
mongodb - a mongodb adaptor that functions as both a source and a sink  
postgres - a postgres adaptor that functions as both a source and a sink  

To create our initial configuration, we can select one for a source adaptor and one for a sink adaptor.

Creating a configuration

Say we wish to move data from MongoDB to a file. For this we can select mongodb as the source adaptor, and file as the sink adaptor. The init command always writes out new configuration files, but it will prompt before overwriting existing files. To be sure though, run it in a clean or new directory before running it.

$ mkdir transporter-example-1
$ cd transporter-example-1
$ transporter init mongodb file
Writing pipeline.js...  

There is now one file in your current directory, pipeline.js. This file defines how the Transporter works. Here it looks something like this:

var source = mongodb({  
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}"

var sink = file({  
  "uri": "stdout://"

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

Everything in Transporter is defined in JavaScript. The configuration of source and sink is done by calling a function whose name is the name of the adaptor required. So var source=mongodb({ means we want a variable called source to represent a mongodb adaptor. The parameters are then passed to it as a JavaScript object. The init command generates all the parameter options but comments out everything but the essential one. In this case, and with most adaptors, it's the "uri" parameter. The other options available timeout, tail, ssl, cacerts, wc, fsync, bulk and collection_filters. The reference page for the adaptor will go into more detail on these; we'll just touch on the ones we need to change. The file node created for the sink variable just takes a uri; it's set to stdout so we won't worry about it now.

Setting up the nodes

First up is the uri setting. The uri is the canonical way of describing a connection to a database or similar. It can contain the protocol, host names, ports and more all in one string. Of course this isn't something that people want embedded in files. Thats why this example uses the ability of Transporter's JavaScript files to import environment variables.

In this case, the configuration is pulling in the MONGODB_URI environment variable... so we'd better go set that. We have a current MongoDB on Compose set up and if we ask for the connection string in the UI for the enron database we get this "mongodb://,", so let's set that in the environment.

$ export MONGODB_URI="mongodb://,"

That's actually the only settable value in the this part of the pipeline.js file, so why don't we run transporter test at this point. transporter test will, given a JavaScript .js pipeline file, load up everything and test the connections. If we don't specify a file, Transporter will default to using pipeline.js so all we have to do is transporter test:

$ transporter test
Invalid URI (mongodb://,, unsupported connection URL option: ssl=true  

This is a MongoDB specific error; the adaptor can take everything from the connection string but the MongoDB options at the end. We have to remove that ?ssl=true from the environment variable.

$ export MONGODB_URI="mongodb://,"

And then set an appropriate other option. That was the option that turns on SSL, so we can enable it by editing the transporter.yaml file, uncommenting the ssl setting, setting it to true, and making sure we have commas in the right places. It should look like this when done:

var source = mongodb({  
  "uri": "${MONGODB_URI}",
  "ssl": true

var sink = file({  
  "uri": "stdout://"

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

We've removed the commented out options for clarity. Now if we run the test:

transporter testTransporterApplication:  
 - Source:         source                                   mongodb         /.*/                      mongodb://,
  - Sink:          sink                                     file            /.*/                      stdout://

This tells us our nodes are connecting to the outside world.

Pipelines and namespaces

We can now look at what is in the rest of the pipeline.js file. It currently looks like this:

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

The variable t represents the Transporter itself. What we do next is chain a pipeline of adaptors from it. The first is Source() and it takes up to three parameters. Here, we are presenting it with all three; a name for the adaptor to use when it logs messages and errors, a previously created variable that represents the adaptor and a namespace.

For the MongoDB adapter, the namespace is a regular expression which should match all the collections we want to read from. The regular expression /.*/ matches anything so all collections in the database will be read. If you wanted to read from a specific collection, say "stuff" in the test database, the namespace would be "/^stuff$/". The ^ denotes the start of a string and the $ the end of a string so you match only "stuff" and not "wonderstuff" or "hrpufnstufforever". The init command sets the namespace to match anything by default; watch out for that as you could pick up stray collections.

When the Transporter is ready, it'll wake up this adaptor and it'll start reading from the database through the configured adaptor. For each document read, it'll create a message and send it on down the pipeline.

The .Save() takes message coming down the pipeline and, given its options, sets out to write it somewhere. Like the "Source()" function, it takes up to three parameters, a name, an adapter variable and a namespace. This time the namespace regular expression is used to decide if a message should be processed by the sink but the regular expression has to match for a message to be eligible to be written. It's a regular expression and it's set to match anything it sees.

We want to get everything from our enron1 database so all we need to change here is the first namespace, like so:

t.Source("source", source, "/^enron$/").Save("sink", sink, "/.*/")  

Write that back to disk and we are now ready to run this pipeline.

$ transporter run
IINFO[0000] adaptor Starting...                           name=source path=source type=mongodb  
INFO[0000] boot map[source:mongodb sink:file]            ts=1490887223414636518  
INFO[0000] adaptor Listening...                          name=sink path="source/sink" type=file  
INFO[0000] starting Read func                            db=enron  
INFO[0000] collection count                              db=enron num_collections=2  
INFO[0000] skipping iteration...                         collection=cheeses db=enron  
INFO[0000] adding for iteration...                       collection=enron db=enron  
INFO[0000] done iterating collections                    db=enron  
INFO[0000] iterating...                                  collection=enron  
INFO[0000] Establishing new connection to (timeout=1h0m0s)...  
INFO[0000] Connection to established.  
INFO[0001] SYNC Adding to cluster as a master.  
INFO[0001] SYNC Synchronization was complete (got data from primary).  
INFO[0001] SYNC Synchronization completed: 2 master(s) and 0 slave(s) alive.  
{"_id":"565862ea414f0983b632df23","body":"the scrimmage is still up in the air...\n\n\nwebb said that they didnt want to scrimmage...\n\nthe aggies  are scrimmaging each other......

After that point, you'll want to stop that pretty quickly as it will work through your entire database echoing it all out to the console as JSON documents. Hit Control-C. For the sharp eyed, look at that log and you can see there were two collections, one called enron, the other cheeses. Because we told it only to match with enron in the Source mongodb adaptor, it happily ignored the "cheese" collection we also helpt in the database.

Running transporter quietly to a file

What you do see in the snippet above is the tracing. The Transporter is defaulting to being overly chatty because there's a lot of metrics and information that can be useful when setting up a Transporter. When you do go to production with Transporter, you can use the -log.level option to select which messages you want to log.

The other issue here is that everything is going to stdout, which is the default for a transporter init generated setup. We just need to change the sink entry like so:

var sink = file({  
  "uri": "file://dump.json"

And now our database will be written to the dump.json file in the current directory. Let's run that now and we'll mute the information messages too:

$ transporter run -log.level error
$ ls -lh
total 3005680  
-rw-r--r--  1 dj  staff   1.4G Mar 30 16:31 dump.json
-rw-r--r--  1 dj  staff   186B Mar 30 16:28 pipeline.js

At this point, we have Transporter extracting data from a MongoDB database and saving it as JSON data. In the next part, we'll look at configuring the Transporter to send data to a database.

If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at We're happy to hear from you.

Dj Walker-Morgan
Dj Walker-Morgan was Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.