Mongo to Mongo Data Moves with NiFi

Published

There are many reasons to move or synchronize a database such as MongoDB: migrating providers, upgrading versions, duplicating for testing or staging, consolidating, and cleaning. There are even more ways to perform the function of moving said data: mongodump/mongorestore, on Compose there is Import which is backed by Transporter, one could write their own custom scripts, and one could use a tool such as NiFi.

Here we will review a couple of scenarios using NiFi. First, we'll look at the simplest approach possible of just queries and inserts then a brute force approach of polling with de-duplication and then move on to a more advanced synchronization approach by hooking into MongoDB's oplog. Since all of these are on NiFi the flexibility of adding extra transforms and viewing what's happening will always be available too.

One Time

For a one time move a tool such as NiFi might be more overhead than is needed unless there are some mitigating circumstances like large size or desired transformations but it can be done.

OneTime

The simplest way to get started is to pull both a GetMongo Processor and PutMongo Processor onto the canvas and connect them via a success relationship (see here for details on Processors or here for details on NiFi overall). Each pair of these is good for a single Collection.

OneTimeWithCopies

They are easy to copy so you can configure the first pair with things like connection strings and logon and then copy them. After that just change the relevant details for each Mongo Collection which really isn't too much trouble.

OneTimeRunSchedule

Since NiFi is built for data that is flowing, the typical idiom for a Processor such as GetMongo is to run over and over again which would generate duplicate data. So, configuring the Processor to run only once can be effectively done by setting the Run Schedule to some large interval that would be much longer than the actual one time session of copying data. This will ensure that the GET query will only run once per day when you start the flow which should be sufficient.

Brute Force Sync with Polling and De-duplication

The next solution builds upon the previous. It utilizes the same GetMongo and PutMongo processors at the edges but enhances the one shot nature of the previous example by adding a few fingerprinting and de-duplication steps to allow for a more continuous flow of data and a regular synchronization. The following details the flow for one collection:

PollingOverview

It starts the same with the GetMongo then steps to HashContent which generates a fingerprint of each FlowFile's content and puts it into a FlowFile attribute. The next step isn't technically needed for this flow since GetMongo doesn't write any attributes but is included to show that the unique key can be generated from both the content and attributes to ensure proper de-duplication.

PollingUnique

The ComposeUniqueRocksDB only needs two properties configured. The Directory for the actual RocksDB data files and the name of the FlowFile's attribute which has the key to be checked. Since, the fingerprint attribute is a truly unique key built from all of the relevant data then we can rely on ComposeUniqueRocksDB to only pass on a FlowFile to the unseen relationship and hence next step of PutMongo if it hasn't actually seen the Document contained in the FlowFile. ComposeUniqueRocksDB is part of a custom extension package. There are other solutions for de-duplication that come with NiFi but they rely on more sophisticated configuration with some ControllerServices.

PollingSchedule

So, now that we only pass on the new data, we can go ahead and configure the most important parameter of this flow which is how often we schedule GetMongo to run. This will determine how often we query the Collection in the Mongo database. The downside is the amount of query load it places on the source system since it will query the entire collection over and over again according to this Run Schedule that is set above. For some situations this may be fine. If your collection isn't super large and you don't mind some extra load, or if how often you need synchronization can be changed or shifted to less utilized times, then this might be "good enough". This particular flow will basically perform the snapshot plus changes by building up a unique index and comparing the entire result set against the previously seen keys to decide whether a Document is actually new. While this is a lot of processing, it is simple and may work for you. If not, then the next solution might.

Tailing the Oplog

One of the idiosyncratic features of MongoDB's replication implementation is that it can be "hooked into". Mongo's oplog is a capped collection which is ultimately just a buffered changelog. Just like a regular Collection it can be queried and the database state changes can be moved and applied to another database. In essence, this is what Mongo does internally to keep replicas in sync.

A snapshot of the database plus the changes after the snapshot equals the current state. Marrying the streaming nature of the changes to NiFi makes a lot of sense and is the most complete solution if you have access to Mongo's oplog.

The below is an example flow which uses some custom code from the same package as the previous example which is not already in NiFi. It is easily built with mvn package and then easily deployed to NiFi by just copying a file and restarting the server.

TailingOverview

Somewhat counter to the general notion of a NiFi Processor being simple and doing one thing, these two Processors are a little more sophisticated. Where for the GetMongo Processor we have to create one for each Collection. Whereas for the ComposeTailingGetMongo one suffices for the entire Mongo Database.

The ComposeTailingGetMongo runs only once and stays running. It begins by creating FlowFiles for every Document currently in a Mongo Database (a snapshot) then it will continue to generate FlowFiles for any relevant operations such as inserts, updates, and deletes for as long as it runs (the changes). And while this is example code, it is useful example code and could easily be used in multiple situations. This Get plus the matching ComposeTailingPutMongo is sufficient to keep entire MongoDB's in sync. Plus, this use case is a great match to NiFi's managing and running of data flows. And, we haven't even mentioned that it is easy to transform this data by inserting extra processing steps and even duplicating to multiple databases by adding another ComposeTailingPutMongo:

TailingDuplicate

Continuous, Visible, and Easily Customized Synchronization

The various solutions above are good examples of how useful NiFi can be for moving data. Whether you need to synchronize your test Mongo database with production data, or whether you need to migrate to a new Storage Engine, or whatever your moving database use case may be, by choosing NiFi you get access to all of the benefits of a sophisticated data flow platform with less effort than "rolling your own" solution.

Image by: Rusellblande

Conquer the Data Layer

Spend your time developing apps, not managing databases.