How to move data with Transporter - from disk to database

Published

Compose's open-sourced Transporter is a powerful tool. In this article, we'll show you how to use it to upload data to a MongoDB database using Transporter and unlock the power of Transformers.

In the first part of this series, we downloaded data from a MongoDB database using Transporter. For our next step, we're going to upload some data to a MongoDB database, see how it doesn't fit and use Transporter's tools to make it fit. See the first part for a how-to on getting Transporter running on your local system

Setting Up

For this we're going to use an example dataset from the JSON Studio tutorials datasets - Download and unzip the startup company dataset. This is a dump of data from a MongoDB database of some now out of date information, but it is full of some large JSON records. We'll name this file companies.json.

We've also set up a MongoDB deployment before hand - using Compose it's as simple as clicking "Create Deployment" - we then made a database called company with a user named driver. Create your own database in the cloud or locally if you want to follow along.

Starting the Transporter

As before, we'll use transporter init to create our configuration files. This time the file is the source and the database is the sink:

$ transporter init file mongodb
Writing pipeline.js...  
$

Now we can edit the pipeline.js file. The source needs to point to the companies.json file so we'll change the file URI in the source. The MongoDB database is, in this case, secured with SSL so we also need to turn on the ssl parameter in the sink settings. That done, the file should look like this:

var source = file({  
  "uri": "file://companies.json"
})

var sink = mongodb({  
  "uri": "${MONGODB_URI}",
  "ssl": true
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")  

We'll need to export our MONGODB_URI to the environment and then we can test our setups connectivity. Remember to remove ?ssl=true from the connection string too:

$ export MONGODB_URI="mongodb://driver:pass@sl-eu-lon-2-portal.2.dblayer.com:16920,sl-eu-lon-2-portal.3.dblayer.com:16920/company"
$ transporter test pipeline.js
TransporterApplication:  
 - Source:         source                                   file            /.*/                      file://companies.json
  - Sink:          sink                                     mongodb         /.*/                      mongodb://driver:pass@sl-eu-lon-2-portal.2.dblayer.com:16920,sl-eu-lon-2-portal.3.dblayer.com:16920/company

When we run the transporter, there's an error.

ERRO[0000] ERROR: write message error ($oid is not valid for storage.)  path="source/sink"  

TIP: There may be a lot of information messages emitted too, so if you want to concentrate on just seeing errors use transporter run -log.level "error" pipeline.js .

If you aren't using the companies.json file we selected at the start, there's a good chance you won't see this error... but even if you don't, read on.

Transforming the error

The complaint here is that there's something odd in what's being written to MongoDB related to $oid. Let's have a look at the JSON file with the jq tool.

$ jq .  < companies.json | more
{
  "_id": {
    "$oid": "52cdef7c4bab8bd675297d8a"
  },
  "name": "Wetpaint",
  "permalink": "abc2",
  "crunchbase_url": "http://www.crunchbase.com/company/wetpaint",
  "homepage_url": "http://wetpaint-inc.com",
...

There's the $oid, nestled within the _id field for the record. This data came from MongoDB and MongoDB has ObjectID as a data type. When unmarshalled, it turns into this format which doesn't like being written back. But what do we do with this field? Well, we could turn it into a string... but how.

This is where the Transformer comes in. Transformers can be native functions or JavaScript programs which are dedicated to processing Messages. Messages are how records packaged and passed between the source and the sink in the pipeline. They can also be passed through Transformers if we add a .Transform() method to the pipeline. There are a set of native transformers, omit, skip, pick, rename and pretty. There's also two JavaScript engines in Transporter, the older otto and newer goja. We're going to use to older otto engine for reasons we'll explain later. What's more important is the process of adding any transformer.

t.Source("source", source, "/.*/")  
        .Transform(otto({"filename":"pretty.js"}))
        .Save("sink", sink, "/.*/")

We've spread the pipeline over three lines for readability. The Transform() function plugs a Transformer into the pipeline. Like the Source() and Save() methods it can have up to three parameters; a name, a Transformer function, and a namespace. We've set ours to use a default name and to match everything by only giving it a Transformer function, otto. The otto function takes, as a parameter, a "filename", here it is "pretty.js".

module.exports = function(msg) {  
    console.log(JSON.stringify(msg, null, 2));
    return msg;
};

This simple Transformer is incredibly useful. It takes the msg object and, using the JavaScript built-in functions console.log and JSON.stringify, prints out a formatted JSON object for each message. We'll save this as pretty.js and then edit the pipeline.

Save that as pretty.js and now when we run with this pipeline we get output like this:

{
  "data": {
    "_id": {
      "$oid": "52cdef7c4bab8bd675297d90"
    },
    "acquisition": {
    }
...
  },
  "ns": "companies.json",
  "op": "insert",
  "ts": 1489488630
}
...

This is the entire message contents, and yes, we did skip most of the data. Now we can see the JSON document we saw earlier with jq, wrapped as the data field's value. There are also three other fields in the message. The ns field is for the namespace value and here it's been set to the name of the file that was read from by the adaptor. The op field can contain insert, update, delete and some other specialized labels. It shows what kind of operation the data was associated with. Here, it is a simple insert. Finally, ts is a timestamp for internal monitoring of the message in the Transporter.

With this knowledge in-hand we can set about fixing our $oid problem. Here's a file called oidremove.js, a Transformer that takes the value $oid contains and makes it into the _id value, as a string:

module.exports = function(msg) {  
    msg.data._id = msg.data._id.$oid;
    return msg;
};

Let's install that into our pipeline like so:

t.Source("source", source, "/.*/")  
        .Transform(otto({"filename":"oidremove.js"}))
        .Save("sink", sink, "/.*/")

Run the Transporter and sit back as it steadily pushes up records to the MongoDB database. And it'll trudge along, doing a document at a time taking around 600 seconds to finish here (your mileage will vary depending on network latency). The MongoDB adaptor has a bulk option to help in those circumstances. We can set the option in the pipeline.js file:

var sink = mongodb({  
  "uri": "${MONGODB_URI}",
  "ssl": true,
  "bulk": true
})

Now, the Transporter will tear through the data, here taking just 76 seconds to upload the JSON file. If in doubt, turn the bulk option on.

Namespaces

If you look in your MongoDB database you'll find that the Transporter has created a collection called companies.json. That is the namespace value allocated to it by the file adaptor at the source. It gets passed down the pipeline and when it reaches the Save() function, the adapter uses the namespace value to save it into that collection. The namespace setting in a Save() function is a regular expression used as a filter on the incoming messages namespace values. If the message namespace value doesn't match the Save() namespace, it isn't allowed through to be saved.

Let's see that in practice by changing the Save() namespace setting to known.

t.Source("source", source, "/.*/")  
        .Transform(otto({"filename":"oidremove.js"}))
        .Save("sink", sink, "known")

If we run the Transporter now, nothing will be written... The messages' ns field is "companies.json" and that doesn't match "known" so everything is filtered out. Now we could rename the source file known and change the source variable definition to read from that file, but in real life we can't just rename things. What we can do is change the ns field in a transformer manually. Let's change the oidremove.js so it sets the message ns value to "known":

module.exports = function(msg) {  
    msg.data._id = msg.data._id.$oid;
    msg.ns = "known";
    return msg;
};

And we're off, with a new collection created called "known". That the Transformer has this control means you can write JavaScript code that selects what collection an incoming record is stored in, or, using the filtering, which Sink it flows down. As well as the built-in JavaScript functions, the otto JavaScript engine also has the Underscore library available to help you mutate your data, either through renaming, deleting, picking or projecting, within the JavaScript code of your Transformers. One last trick with the otto JavaScript engine; if your Transformer script returns "false" rather than a message, it will discard that message from the pipeline.

Next

What we haven't dived into are the native Transformers or touched on the goja JavaScript engine which is also available. We'll look at them in a future part. In the next part of this series, we'll look at moving data from database to database, where the Transporter helps and where you'll find the different data models of modern databases rub up against each other.


If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

Dj Walker-Morgan
Dj Walker-Morgan is Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.