Transporter Maps: MongoDB to Elasticsearch

The open source Transporter from Compose is a powerful tool and there's a lot to take in when you want to get started. With people wanting to use some particular configurations of Transporter, it is an opportune time to offer some simple recipes for those configurations. The first configuration we'll cover is MongoDB to Elasticsearch as it's the most popular use of Transporter:

MongoDB to Elasticsearch

We'll start with a quick run through of the essential components. You'll need a copy of Transporter of course. You can build it yourself using the contents of the GitHub repository compose/transporter or download one of the Transporter binaries we've made available on an as-is basis for 64 bit Linux and Mac OS X. Once you have that, you'll need a configuration file and an application file.

The configuration file

The configuration file is a YAML file, typically named config.yaml, which defines how the Transporter connects to databases. Each connection has a name, a type, a URI and a namespace. There can be more of these settings – run transporter about to list the available connections and transporter about mongo to find out about the MongoDB settings. Let's look at the start of our example config:

nodes:  
  sourcemongo:
    type: mongo
    uri: mongodb://user:pass@host:port/dbname
    namespace: dbname.collections

We start by defining a group of nodes:. the first node is named sourcemongo and its type is mongo for a MongoDB adapter. The next two properties you'll need to configure are for your particular installation.

The first is the URI for the MongoDB database. You'll find that information on your Compose MongoDB database console overview, so log in, go to your MongoDB deployment and select the particular database you are working with. Then select Administration and copy the URI from the Connection Strings panel. You'll also need the password for a user that's able to access the database. Put those together and you'll have the URI. The second thing you need to set is the namespace. In MongoDB terms, the namespace is the name of the database followed by a period then the name of the collection. The collection name in Transporter can be a regular expression so you can match multiple collections and pull documents from all of them, but it can also be just the name of one collection.

We also need to define a destination for the data, specifically Elasticsearch. In our template configuration file we have this:

  destes:
    type: elasticsearch
    uri: https://user:pass@host:port/
    namespace: index.type

This includes the name of the node, the type (elasticsearch), a URI – which you can obtain from your Compose console for your Elasticsearch deployment (along with a user name and password) – and a namespace. This time the namespace defines the index name and type the records will be created with.

That's that for the config.yaml file.

The Application file

The application file is a JavaScript program that uses the information in the config file to create a pipeline for processing the messages. Here's our bare minimum application.js file.

pipeline = Source({name:"sourcemongo").save({name:"destes")  

We are, quite literally, creating a pipeline. The Source comes from the 'sourcemongo' node in the configuration file, so all the variables there are loaded to initialize the source. That source will read documents from MongoDB and send them on down the pipeline as messages to the next part of the pipeline – in this case the save component. The save component is another read from the configuration file, this time using the node data for destes. It will take in messages and write them out.

Starting transporter

Assuming the two files above have been created, then you can first test the configuration files make sense by running:

transporter test application.js  

If you pass that and get output about how your Transporter is configured you are ready to move on.

To run the one-off transport you do:

transporter run application.js  

And the Transporter should, with this setup, silently go ahead and copy everything from the MongoDB collection or collections into Elasticsearch.

Before you do do a bulk import into Elasticsearch, consider scaling up your Elasticsearch deployment. Auto-scaling can only step in for sustained resource demand and cannot react to sudden unexpected bursts of sustained activity. Elasticsearch can generate that kind of activity during imports and it handles running into resource limits by crashing nodes. The cluster will recover and the node will come back up, but the import will take much longer. So, before importing scale your cluster at least 2x its current provision and then do the import. When it is done and activity has settled down, you will be offered the opportunity to scale down your cluster.

Now, depending on how much data you have in MongoDB to transfer, the Transporter will finish copying and exit.

Tailing MongoDB

If you prefer your Elasticsearch database be kept in sync with the MongoDB data after the initial copy, then you will likely want to use the tail option. You can invoke this by adding tail: true to either the sourcemongo node definition in the configuration file, or to the list of properties passed when creating the source in the application file. The former is more appropriate for most cases as it is then statically defined. To tail, the MongoDB user needs to also have access to the oplog. On Compose, that privilege is granted when you create a new user by selecting "oplogaccess" as one of the additional attributes of the user.

Transforming the content

Transformers are JavaScript programs which can manipulate each message passing through the pipeline. This is the canonical minimal transformer code:

module.exports = function(msg) {  
  return msg
}

The script is passed a msg which contains some metadata about what the message represents and the actual document being passed in msg.data. The script has to return a msg which contains the modified version of the message to be passed on down the pipeline. If we save that Transformer script as transform.js we can use it in a Transporter by modifying the application script, adding a transform between the source and the sink. This leaves it looking like this:

 pipeline = Source({name:"sourcemongo"})
  .transform({filename:"transform.js",namespace:"."})
  .save({name:"destes"})

The transform function has a number of properties, the filename points to our script while the namespace is a selector. All messages have a namespace associated with them and we previously set the namespaces in the configuration file for the sourcemongo and destes nodes. As transformers don't appear in the configuration file, you need to specify a namespace for them when they are created in the application script.

A namespace is made up of a scope name (such as the database name or index name) and a specific name (such as the collection or type). The latter can be configured as a regular expression so that a node – save or transformer – will only work on matching messages. There's a lot of power in this mechanism, but for our needs, we want to process all messages. So, the minimum namespace we can define is ".", which will match everything.

If you run this Transporter now, nothing different will happen. We haven't told the transformer to do anything. Let's make it log every message passed to it:

module.exports = function(msg) {  
  console.log(JSON.stringify(msg))
  return msg
}

Run that and you should get a lot of output of unformatted JSON messages. If you change the log line to console.log(JSON.stringify(msg,null,' ')) you'll get pretty printed versions of the messages; useful for when you are debugging a Transporter configuration or just looking to understand how Transporter works.

Also, when you are developing a Transporter configuration, you may want to track its progress; one option would be to use a site plugin such as Kopf, available from your Compose Elasticsearch console, which shows the count of documents in its Node view. Alternatively, you could add a progress count using the Transporter's Transformer option. For example, this transformer will emit a console update every 100 messages:

i=0;

module.exports = function(msg) {  
  console.log(JSON.stringify(msg,null,' '))
  i++;
  if(i%100==0) {
    console.log(i+" processed");
  }
  return msg
}

Concluding this journey

Of course transformers are for more than debugging and we'll be updating our coverage of data manipulation using them in a future article. We'll also be looking at other database connections too because the Transporter is for more than just MongoDB and Elasticsearch.

For now though you should have enough information to get a Transporter running between MongoDB and Elasticsearch and debug the connection to get a reliable data transfer system. Happy Transporting.