What You Need to Know to Extend NiFi

Published

Generic data tools often miss the mark. Most data is nuanced and idiosyncratic. At the high level, data flow and transformation tasks are essentially the same, but when you get down to implementation level, small differences in the data often means those tasks need customization. NiFi provides the services which are necessary and nice for a data flow tool plus it provides these services to an easily customizable class: the Processor.

For more background and to review the FlowFile which is what a Processor actually processes see here.

This Blank Canvas Isn't So Blank

BlankCanvas

When first logging into the NiFi UI, you are presented with a blank canvas. While it's certainly true that without a data flow created, configured, and set to run, you don't have much. That is unless you want to think about it from the perspective of potential. A blank NiFi canvas is really a great deal of services and code waiting to be employed. Whether you are just a straightforward user of NiFi, or intend to extend it to take advantage of its well polished class hierarchy, this blank canvas represents a runtime environment waiting for the addition of NiFi Processors. It's an environment optimized for connecting data flow processing steps. It exposes a component model that embeds its own documentation and configuration. It excels at managing data with an in-memory key-value store for speed, a couple of persistent write ahead logs for performant durability of your data, and even a searchable index of history. Whether you build your own Processor or use a builtin, they are no different. They are built the same, deploy the same, and even run the same in this not-so-blank canvas after all.

First Steps: Adding a Processor

AddProcessor

By dragging the processor icon onto the canvas, the first step begins towards flowing the data. When the icon is dropped you are presented with a great number of choices to Add a Processor:

ProcessorChoices

There are over one hundred types to choose from. The ones included are all loaded from a configured directory which NiFi reads when it starts up (typically $NIFI_ROOT/lib). What's really nice is that this list is dynamically generated. All of these components are extensions of the core API packaged in an extended jar file format that includes any dependencies. By using custom ClassLoaders, the NiFi server allows you to add additional Processors to the platform the exact same way that all Processors are added to the system: by just adding a .nar file to the configured directory and restarting the server. Here's a portion of a directory listing for the lib/ directory to give you a sense:

NiFiLibDir

These NiFi Archives (nar files) are just jar files with all of their dependencies packaged in. With the ability to add .nar files at runtime instead of compile time, the NiFi team really needed to handle the possibilities of dependency hell. There really isn't anything magical about the format and they do provide a maven archetype to provide a skeleton for a new nar project. Here's the output from unzipping an example nar:

InsideTheNar

Configuration and Docs All Builtin

Once you pick one of the available Processors, it is instantiated and ready for configuration:

ConfigureMeThis

The little warning triangle on your newly instantiated Processor means that some configuration that must be done in order to schedule this Processor hasn't been done yet. This is no surprise since we just put it there. If you do a Ctrl-Click on the Processor up will pop a menu which has all of the high level actions available:

ProcessorMenu

By going into Configure we can access the real guts of what it means to manage a Processor:

ProcessorMenuSettings

With its Settings, the Processor exposes the Relationships which can be Auto-Terminated. Basically, if a Processor transfers a FlowFile to a relationship that is Auto-Terminated, then it just drops it; No questions asked and no more history kept. Eventually most flows end up with this being configured on some Processor. Although it's usually on one at the end that performs some kind of side effect like posting to a data store or filesystem of some type after everything else is finished.

These Relationships are part of the Processor's code too. So, while it appears that there are standard names like success and failure they are really just conventions. A Relationship is really just a named queue that can be setup between Processors. Just like in message queuing the interface between the Processors is generic and decoupled. It allows for everything to work at its own speed or even be stopped for a while without any fear of data loss. And, like was mentioned they are just named queues that require some basic static configuration:

Relationships

This ComposeStreamingGetMongo Processor only has the one Relationship because it generates FlowFiles when it runs and pumps them out one by one to the success queue. If there is failure, it just stops. Other situations might warrant different paths but this one keeps it simple.

Scheduling: Cron, Timer, or Event Driven

SchedulingStrategy

Scheduling refers to when and how often a Processor will run. There are three different types. Timer based, which means it should run again after some interval of time once it finishes; Think poll this every 5 minutes or every 4 hours. Cron based, which means it should run on a set schedule; Think every Sunday at midnight or every weekday at 6AM. And finally event driven, which means run if there is a FlowFile queued on any inbound queue; Think whenever there is an event waiting to be processed.

SchedulingTasks

The other dimension of scheduling beyond the how often is the how many. When making a custom Processor, the developer can set things like only allow one task to run at a time (no concurrency). The ComposeStreamingGetMongo is this way. It doesn't make much sense to concurrently query the same Mongo collections so it is set to only ever have one task run at a time via an @TriggerSerially annotation to its class declaration. On the other hand, the BatchPutMongo Processor can certainly run concurrently since the data being fed to it will be different and independent in each FlowFile. It can be scheduled to run any number of Concurrent tasks that are reasonable from a resource perspective.

Properties: Self Contained and Documented

ConfigureProperties

Just like Relationships, Properties are typically statically configured and fully contained in each individual Processor's code:

PropertyDescriptors

All the details of things like must be included, validated, use custom expression language builtin to NiFi to pull info out of attributes are configured for each property. This then raises them in the UI and exposes them to the code. It even takes the information and turns it automatically into documentation for the end user:

Documentation

All of this though is really just setup for the whole point of all of this: the Processor's onTrigger method.

Doing the Processor's Bidding: the onTrigger Method

In the world of web servers there is usually some method like handle(request, response) that a developer implements or overrides. The server then executes such method whenever it receives a corresponding event such as GET this url. In this way the developer accrues all of the benefits of the web server code that was written to help such as session management, url destructuring into parameters, and even SSL all available to that one method call. In an analogous fashion, NiFi uses this same Template Method pattern with its onTrigger(context, session) method:

onTrigger

In the above snippet which is from ComposeUniqueRocksDB, the onTrigger is called by the NiFi server code according to its Schedule (i.e. Timer, Cron, or Event) which is set by the user. When it is the session has the FlowFile to be processed. In this particular case a logical.key attribute from the FlowFile is retrieved and compared to a unique index. The outcome of the comparison determines which named queue, or Relationship, the FlowFile will be transferred to: REL_SEEN or REL_UNSEEN.

transferToRelationship

After session.transfer(), the FlowFile with its corresponding metadata is persisted to the multiple repositories NiFi provides to manage all of this. In this particular case, the Content-Repository is untouched since we didn't need to change or even read any of the FlowFile's content or payload data. The FlowFile-Repository was accessed when we utilized the flowFile.getAttribute() but that was in memory already and is only written to via a WAL (Write Ahead Log) in order to persist change to protect against failure. In this particular case the FlowFile-Repository is only written to after a session.commit() is called by the framework when the onTrigger method ends (view AbstractProcessor to see this NiFi class). This session.commit() is analogous to a transaction commit and it actually durably finishes the transfer step, which is to move the FlowFile to the relevant Relationship queue. The data is protected the entire way through the flow from step to step. Note too the getProvenanceReporter().route(flowfile, relationship) call where the audit trail of this same transfer is sent to the Provenance-Repository for end use search and later verification. These are powerful data flow services to say the least.

Some Example Custom Processors

We have reviewed some of the highlights of NiFi Processors. To see a full example nar project with multiple custom Processors please feel free to visit this github repo for nifi-compose-bundle.

It consists of the following three Processors:

  1. ComposeBatchPutMongo.java which takes a FlowFile that contains an array of JSON objects to be batch inserted into MongoDB.
  2. ComposeUniqueRocksDB.java which routes a FlowFile based upon whether it has seen or not seen an attribute previously. This is only done on one local node and it uses RocksDB to persist and manage the index so it is unlike the builtin DetectDuplicate which can detect duplicates across a cluster of NiFi instances. DetectDuplicate requires a network call though for each lookup to a DistributedMapCache. ComposeUniqueRocksDB keeps its lookup in memory and on the local disk.
  3. ComposeStreamingGetMongo.java immediately creates a FlowFile for each document in a query as it is returned. This differs from the builtin GetMongo which only does a session.commit() when the original query is completed and thereby releases all of the generated FlowFiles at once on completion of its query. For a large query over a network this can take time and may require a great deal of buffering. ComposeStreamingGetMongo cannot rollback on a query error at least not the FlowFiles which have already been sent. Also, it doesn't require as many resources plus it allows a data flow to start processing immediately.

While there are some more advanced extension points which cut across multiple Processors like ControllerServices and ReportingTasks, we have covered the basics here by reviewing the Processor to help you get started with NiFi. Obviously, for more in depth details the developer docs and user docs are great resources for next steps. And, there is always the code.

Image by MA Kazmi

Conquer the Data Layer

Spend your time developing apps, not managing databases.