How to move data with Transporter - from disk to database

Published

Compose's open-sourced Transporter is a powerful tool. In this article, we'll show you how to use it to upload data to a MongoDB database using it and unlock the power of Transformers.

In the first part of this series, we downloaded data from a MongoDB database using Transporter. For our next step, we're going to upload some data to a MongoDB database, see how it doesn't fit and use Transporter's tools to make it fit. See the first part for a how-to on getting Transporter running on your local system

Setting Up

For this we're going to use an example dataset from the JSON Studio tutorials datasets - Download and unzip the startup company dataset. This is a dump of data from a MongoDB database of some now out of date information, but it is full of some large JSON records. We'll name this file companies.json.

We've also set up a MongoDB deployment before hand - using Compose it's a simple as clicking "Create Deployment" - we then made a database called company with a user named driver. Create your own database in the cloud or locally if you want to follow along.

Starting the Transporter

As before, we'll use transporter init to create our configuration files. This time the file is the source and the database is the sink:

$ transporter init file mongodb
Writing transporter.yaml...  
Writing pipeline.js...  
$

Now we can edit the transporter.yaml file. The source needs to point to the companies.json file so we'll change the file URI in the source. The MongoDB database is secured with SSL, so we also need to turn on the ssl parameter in the sink. That done, the file should look like this:

nodes:  
  source:
    type: file
    uri: file://companies.json
  sink:
    type: mongodb
    uri: ${MONGODB_URI}
    ssl: true

We'll need to export our MONGODB_URI to the environment and then we can test our setups connectivity:

$ export MONGODB_URI="mongodb://driver:driverpassword@sl-eu-lon-2-portal.2.dblayer.com:16920,sl-eu-lon-2-portal.3.dblayer.com:16920/company"
$ transporter test pipeline.js
TransporterApplication:  
 - Source:         source                                   file            test./.*/                      file://companies.json
  - Sink:          sink                                     mongodb         test./.*/                      mongodb://driver:driverpassword@sl-eu-lon-2-portal.2.dblayer.com:16920,sl-eu-lon-2-portal.3.dblayer.com:16920/company

That seems to pass. But if we try and transporter run things we'd fall into the first of the common traps people hit. There'll be a lot out ouput, but look for the errors and you'll see:

$ transporter run pupeline.js
...
ERRO[0000] ERROR: write message error (not authorized on test to execute command { insert: "companies.json", documents: [ {  
...

Thats because of the pipeline.js file; it's still set to its defaults:

Source({ name: "source", namespace: "test./.*/" }).save({ name: "sink", namespace: "test./.*/" })  

When the MongoDB adaptor goes to save a record to the database, it uses the database name test from the namespace in the sink settings. We don't have a test database set up, hence the error. Let's change that so it points to our company database...

Source({ name: "source", namespace: "test./.*/" }).save({ name: "sink", namespace: "company./.*/" })  

And we run the transporter again and there's another error. This one is much more interesting though:

ERRO[0000] ERROR: write message error ($oid is not valid for storage.)  path="source/sink"  

TIP: There may be a lot of information messages emitted too, so if you want to concentrate on just seeing errors use transporter run -log.level "error" pipeline.js .

If you aren't using the companies.json file we selected at the start, there's a good chance you won't see this error... but even if you don't, read on.

Transforming the error

The complaint here is that there's something odd in what's being written to MongoDB related to $oid. Let's have a look at the JSON file with the jq tool.

$ jq .  < companies.json | more
{
  "_id": {
    "$oid": "52cdef7c4bab8bd675297d8a"
  },
  "name": "Wetpaint",
  "permalink": "abc2",
  "crunchbase_url": "http://www.crunchbase.com/company/wetpaint",
  "homepage_url": "http://wetpaint-inc.com",
...

There's the $oid, nestled within the _id field for the record. This data came from MongoDB and MongoDB has ObjectID as a data type. When unmarshalled, it turns into this format which doesn't like being written back. But what do we do with this field? Well, we could turn it into a string... but how.

This is where the Transformer comes in. Transformers are small JavaScript programs which are dedicated to processing Messages. Messages are how records packaged and passed between the source and the sink in the pipeline. They can also be passed through Transformers if we add them to the pipeline. Let's add a simple Transformer to our pipeline now. There's two parts to this: a JavaScript file which contains the code and an extra stage in the pipeline. Here's the code for a dump Transformer.

module.exports = function(msg) {  
    console.log(JSON.stringify(msg, null, 2));
    return msg;
};

This simple Transformer is incredibly useful. It takes the msg object and, using the JavaScript built-in functions console.log and JSON.stringify, prints out a formatted JSON object for each message. We'll save this as dump.js and then edit the pipeline.

Source({ name: "source", namespace: "test./.*/" })  
    .transform({ filename: "dump.js", namespace: "test./.*/" })
    .save({ name: "sink", namespace: "company./.*/" })

We've spread the pipeline over three lines for readability. The addition is the transform() function in the middle. The options for this are a filename to load the Transformer from and a namespace setting which can filter which messages are processed. We've set it to match everything. When we run with this pipeline we get output like this:

{
  "data": {
    "_id": {
      "$oid": "52cdef7c4bab8bd675297d90"
    },
    "acquisition": {
    }
...
  },
  "ns": "companies.json",
  "op": "insert",
  "ts": 1489488630
}
...

This is the entire message contents, and yes, we did skip most of the data. Now we can see the JSON document we saw earlier with jq, wrapped as the data field's value.There are also three other fields in the message. The ns field is for the namespace value and here it's been set to the name of the file that was read from by the adaptor. The op field can contain insert, update, delete and some other specialized labels. It shows what kind of operation the data was associated with. Here, it is a simple insert. Finally, ts is a timestamp for internal monitoring of the message in the Transporter.

With this knowledge in-hand we can set about fixing our $oid problem. Here's a file called oidremove.js, a Transformer that takes the value $oid contains and makes it into the _id value, as a string:

module.exports = function(msg) {  
    msg.data._id = msg.data._id.$oid;
    return msg;
};

Let's install that into our pipeline like so:

Source({ name: "source", namespace: "test./.*/" })  
    .transform({ filename: "oidremove.js", namespace: "test./.*/" })
    .save({ name: "sink", namespace: "company./.*/" })

Run the Transporter and sit back as it steadily pushes up records to the MongoDB database. And it'll trudge along, doing a document at a time taking around 600 seconds to finish here (your mileage will vary depending on network latency). The MongoDB adaptor has a bulk option to help in those circumstances. We can set the option in the transporter.yaml file:

nodes:  
  source:
    type: file
    uri: file://companies.json
  sink:
    type: mongodb
    uri: ${MONGODB_URI}
    ssl: true
    bulk: true

Now, the Transporter will tear through the data, here taking just 76 seconds to upload the JSON file. If in doubt, turn the bulk option on.

Namespaces

If you look in your MongoDB database you'll find that the Transporter has created a collection called companies.json. That is the namespace value allocated to it by the file adaptor at the source. The secret to the current implementation of namespaces in Transporter is that the first half of the namespace given in a pipeline is actually a hint to the underlying adaptor or transformer as to what database, table or similar value to use when extracting or writing data.

That's why the save() function (and MongoDB adaptor below it) is given a namespace "company./.*/"; we want it to write to the company database, so that's where we specify it, before the dot in the namespace. Incoming message's namespace string is then matched with everything after the dot in the adaptor's namespace string - and that can be a regular expression – and if it matches then that namespace string is used as the collection name.

For example, let's change the save() namespace setting to company.known. If we run the Transporter now, nothing will be written... the messages' ns field is "companies.json" and that doesn't match "known" so everything is filtered out. What we can do is change the ns field in a transformer; let's change the oidremove.js so it sets the message ns value to "known":

module.exports = function(msg) {  
    msg.data._id = msg.data._id.$oid;
    msg.ns = "known";
    return msg;
};

And we're off, with a new collection created called "known". That the Transformer has this control means you can write JavaScript code that selects what collection an incoming record is stored in, or, using the filtering, which Sink it flows down. As well as the built-in JavaScript functions, the Underscore library is also available to help you mutate your data, either through renaming, deleting, picking or projecting, within the JavaScript code of your Transformers. One last trick; if your Transformer returns "false" rather than a message, you can discard that message from the pipeline.

Next

In the final part of this series, we'll look at moving data from database to database, where the Transporter helps and where you'll find the different data models of modern databases rub up against each other.


If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

Dj Walker-Morgan
Dj Walker-Morgan is Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page and keep reading.