Loading
Your request has been received. A member of our team will be reaching out shortly

Optimizing Data Processing Workflow with Concurrency and Outbound Policies

Scenario:

There are times when you need to guarantee that all processing is complete at one phase of a data pipeline prior to beginning the execution of the next stage.

Imagine a financial institution that processes transaction data from various sources to generate daily financial reports. The data processing workflow consists of multiple stages to populate a local database with numerous tables. They then want to generate a report joining those tables and deliver that via email. The population of the tables can happen concurrently, but there is a requirement to ensure that the reporting stage does not start until all the population stages have completed their processing.

Solution:

Clockspring allows you to configure your data pipeline to account for this and ensure that all processing is completed in the initial process group before any data is processed by the second group.

Two process groups connected via a relationship

To do this, we can go to the configuration options of the Populate Database process group and configure the Process Group FlowFile Concurrency and Process Group Outbound Policy. By default each group will run immediately and pass data to the next group as soon as it is available to send.

Default Process Group Concurrency and Outbound Policy properties

Process Group FlowFile Concurrency gives options on how and when the process group will take incoming FlowFiles.

  • Unbounded (default) - this setting prompts the process group to constantly check the incoming queue for new FlowFiles and process them immediately. There is no delay configured, and new FlowFiles will be added to the process group as soon as they are available.
  • Single FlowFile Per Node - this setting instructs the process group to process only one incoming FlowFile from the input port(s) at a time. It waits until it has completed processing the prior FlowFile and has sent it out of that process group before pulling in another one
    • Single Batch Per Node - this setting directs the process group to pull in all FlowFiles from the input port's queue until it has cleared the queue to zero remaining FlowFiles. Additional FlowFiles added to the incoming queue after it has been cleared will wait until the process group has completed processing of the prior FlowFile(s) and sent them out to another process group.

    Process Group Outbound Policy gives options on how and when it will release FlowFiles from the process group output to other process groups.

    • Stream When Available (default): This setting directs the process group to immediately release any FlowFiles connected to an output port to subsequent process groups as soon as they are available.
    • Batch Output: This setting instructs the processor group not to release any FlowFiles to an output port until all FlowFiles within the processor group are in an output port queue. This could be a single output port or many, but no files will be released if they are actively being processed by a processor or in the queue for a processor or funnel.

    To handle our scenario, we will want to configure the Populate Database processor group's Outbound Policy to Batch Output. This ensures that all work has been completed in the Populate Database group before any work begins in the Generate Report process group.
    However, there's a caveat. To ensure that the process group knows when all files are completed, it must also be configured with a setting other than Unbounded for the input files. If Clockspring constantly pulls in new files from an input port, it will never know when it is "complete" to release the data to an output port. Even though there are no input queues going to the Populate Database process group, we must configure the FlowFile Concurrency to either Single FlowFile or Single Batch. Leaving the concurrency as Unbounded will result in the Batch Output change being ignored.
    Applying the following configuration will provide for the desired setup in which every entry is written to the database before the report generation process is allowed to begin:

    Bounded Process Group Concurrency and Outbound Policy properties

Get Started