Mongo to Mongo Data Moves with NiFi
PublishedThere are many reasons to move or synchronize a database such as MongoDB: migrating providers, upgrading versions, duplicating for testing or staging, consolidating, and cleaning. There are even more ways to perform the function of moving said data: mongodump/mongorestore, on Compose there is Import which is backed by Transporter, one could write their own custom scripts, and one could use a tool such as NiFi.
Here we will review a couple of scenarios using NiFi. First, we'll look at the simplest approach possible of just queries and inserts then a brute force approach of polling with de-duplication and then move on to a more advanced synchronization approach by hooking into MongoDB's oplog. Since all of these are on NiFi the flexibility of adding extra transforms and viewing what's happening will always be available too.
One Time
For a one time move a tool such as NiFi might be more overhead than is needed unless there are some mitigating circumstances like large size or desired transformations but it can be done.
The simplest way to get started is to pull both a GetMongo
Processor and PutMongo
Processor onto the canvas and connect them via a success
relationship (see here for details on Processors or here for details on NiFi overall). Each pair of these is good for a single Collection
.
They are easy to copy so you can configure the first pair with things like connection strings and logon and then copy them. After that just change the relevant details for each Mongo Collection
which really isn't too much trouble.
Since NiFi is built for data that is flowing, the typical idiom for a Processor such as GetMongo
is to run over and over again which would generate duplicate data. So, configuring the Processor to run only once can be effectively done by setting the Run Schedule
to some large interval that would be much longer than the actual one time session of copying data. This will ensure that the GET
query will only run once per day when you start the flow which should be sufficient.
Brute Force Sync with Polling and De-duplication
The next solution builds upon the previous. It utilizes the same GetMongo
and PutMongo
processors at the edges but enhances the one shot nature of the previous example by adding a few fingerprinting and de-duplication steps to allow for a more continuous flow of data and a regular synchronization. The following details the flow for one collection:
It starts the same with the GetMongo
then steps to HashContent
which generates a fingerprint
of each FlowFile's content and puts it into a FlowFile attribute. The next step isn't technically needed for this flow since GetMongo
doesn't write any attributes but is included to show that the unique key can be generated from both the content and attributes to ensure proper de-duplication.
The ComposeUniqueRocksDB
only needs two properties configured. The Directory for the actual RocksDB data files and the name of the FlowFile's attribute which has the key to be checked. Since, the fingerprint
attribute is a truly unique key built from all of the relevant data then we can rely on ComposeUniqueRocksDB
to only pass on a FlowFile to the unseen
relationship and hence next step of PutMongo
if it hasn't actually seen the Document contained in the FlowFile. ComposeUniqueRocksDB
is part of a custom extension package. There are other solutions for de-duplication that come with NiFi but they rely on more sophisticated configuration with some ControllerService
s.
So, now that we only pass on the new data, we can go ahead and configure the most important parameter of this flow which is how often we schedule GetMongo
to run. This will determine how often we query the Collection
in the Mongo database. The downside is the amount of query load it places on the source system since it will query the entire collection over and over again according to this Run Schedule that is set above. For some situations this may be fine. If your collection isn't super large and you don't mind some extra load, or if how often you need synchronization can be changed or shifted to less utilized times, then this might be "good enough". This particular flow will basically perform the snapshot plus changes by building up a unique index and comparing the entire result set against the previously seen keys to decide whether a Document is actually new. While this is a lot of processing, it is simple and may work for you. If not, then the next solution might.
Tailing the Oplog
One of the idiosyncratic features of MongoDB's replication implementation is that it can be "hooked into". Mongo's oplog is a capped collection which is ultimately just a buffered changelog. Just like a regular Collection
it can be queried and the database state changes can be moved and applied to another database. In essence, this is what Mongo does internally to keep replicas in sync.
A snapshot of the database plus the changes after the snapshot equals the current state. Marrying the streaming nature of the changes to NiFi makes a lot of sense and is the most complete solution if you have access to Mongo's oplog.
The below is an example flow which uses some custom code from the same package as the previous example which is not already in NiFi. It is easily built with mvn package
and then easily deployed to NiFi by just copying a file and restarting the server.
Somewhat counter to the general notion of a NiFi Processor being simple and doing one thing, these two Processors are a little more sophisticated. Where for the GetMongo
Processor we have to create one for each Collection. Whereas for the ComposeTailingGetMongo
one suffices for the entire Mongo Database.
The ComposeTailingGetMongo
runs only once and stays running. It begins by creating FlowFiles for every Document currently in a Mongo Database (a snapshot) then it will continue to generate FlowFiles for any relevant operations such as inserts, updates, and deletes for as long as it runs (the changes). And while this is example code, it is useful example code and could easily be used in multiple situations. This Get
plus the matching ComposeTailingPutMongo
is sufficient to keep entire MongoDB's in sync. Plus, this use case is a great match to NiFi's managing and running of data flows. And, we haven't even mentioned that it is easy to transform this data by inserting extra processing steps and even duplicating to multiple databases by adding another ComposeTailingPutMongo
:
Continuous, Visible, and Easily Customized Synchronization
The various solutions above are good examples of how useful NiFi can be for moving data. Whether you need to synchronize your test Mongo database with production data, or whether you need to migrate to a new Storage Engine, or whatever your moving database use case may be, by choosing NiFi you get access to all of the benefits of a sophisticated data flow platform with less effort than "rolling your own" solution.