NiFi 2 Python Extensions - First Impressions

Published on February 6, 2024 5 min read

A brief overview of the new NiFi 2 Python Extensions. The ability to use Python's rich ecosystem of libraries and tools within NiFi is a game-changer, especially for complex data manipulation, machine learning, and AI tasks.

Update: Please check out the NiFi-LangChain processors on GitHub.

In the ever-evolving landscape of data management, Apache NiFi stands out as a robust, user-friendly platform suitable for beginners and experts alike. It boasts an intuitive drag-and-drop interface alongside a comprehensive set of programming APIs to build, manage and monitor complex data pipelines. From my personal journey, leveraging NiFi to architect and implement R&D data pipelines has underscored its remarkable flexibility and efficiency. One notable gap, however, has been its lack of native Python support for data science and AI. Until recently, leveraging Python within NiFi required workarounds, such as using Jython or web services, with compromised functionality and reduced efficiency.

NiFi 2 Python Extensions

The release of NiFi 2.0 has introduced a new Python extension mechanism that promises to bridge this gap and represents a significant evolution of the platform. This update facilitates the creation of Python-based processors that can be deployed seamlessly alongside traditional Java-based NiFi processors, even for those with limited NiFi expertise. I was keen to explore this new functionality and assess its potential for my work. Here, I share my initial impressions and experiences with the NiFi 2 Python Extensions.

Quickstart

If you are so eager to dive in and start experimenting, you can quickly get started using a NiFi Docker image. Ensure you select a version 2.x or higher for the Python feature.

docker run --name nifi --rm \
  -p 8443:8443 \
  -e SINGLE_USER_CREDENTIALS_USERNAME=admin \
  -e SINGLE_USER_CREDENTIALS_PASSWORD=nifi+python+extension \
  apache/nifi:2.0.0-M2

After launching the container, you can access the NiFi web interface at https://localhost:8443/nifi and log in with your credentials (e.g. admin and nifi+python+extension). To deploy any Python extension, simply copy your Python processor script file to the container's /opt/nifi/nifi-current/python_extensions directory. Your new processor will appear in the NiFi web interface upon refresh, without a NiFi restart.

A Simple Example

The NiFi development team has opted to provide a more constrained set of APIs for Python extensions compared to the comprehensive suite available for Java. This design choice focuses on implementing core functionalities through the transform method, enabling data manipulation on individual FlowFiles. It allows developers to integrate Python's powerful capabilities for data processing directly within NiFi workflows. Below is a minimal example drawn from the official NiFi Python Developer Guide.

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class WriteHelloWorld(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def transform(self, context, flowfile):
        # Import Python dependencies
        input = flowFile.getContentsAsBytes().decode()
        # Do something with the input
        output = input
        return FlowFileTransformResult(
          relationship = "success",
          contents = output,
          attributes = {"greeting", "hello"}
        )

How It Works

Behind the scene, each NiFi Python processor operates its own Python virtual environment with its own set of dependencies. These dependencies are declared in the class definition and are automatically downloaded upon processor deployment. Unlike the Jython-based approach, this native Python environment supports the use of any Python package, offering far greater flexibility. The following is an example of how to declare dependencies in the class definition.

class PromptTemplate(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.2-SNAPSHOT'
        description = "Create a prompt based on a prompt template and input variables."
        tags = ["LangChain", "LCEL", "prompt template", "LLMs"]
        dependencies = [
          'langchain==0.1.4',
          'langchain-core==0.1.16',
          'langchain_openai==0.0.5'
        ]

Communication between Python and Java, the core NiFi runtime, is established through a local network bridge. This bridge goes beyong mere data exchange; it extends to complex interactions such as manipulating NiFi's Java objects in Python throuhg object proxies. This capability is essential for accessing Java functionalities within a NiFi Python extension. Although the process of network communication and serialization/deserialization inevitably introduces some overhead, the significant increase in functionality makes it a worthwhile trade-off.

Initial Impressions

I have leveraged the new NiFi Python extension to develop several processors based on the LangChain package, a leading framework for large language model (LLM) development. While NiFi already hosts a couple of official Python extensions for LLM operations, they do not utilize the LangChain Expression Language (LCEL) paradiam that is my area of focus. In my opinion, LCEL's standardized serialization and deserialization protocols, along with its built-in schema validations, integrate perfectly with NiFi's fundamental principle of FlowFile-based data pipelines. Herein, I have created a dedicated Python processor for each of LCEL's core components (Prompt, Model, Output Parser etc.), allowing users to build and configure their LangChain pipelines in NiFi without writing any Python code. This set of NiFi-LangChain processors has been published as an open-source project on GitHub.

Screenshot of LangChain Expression Language (LCEL) in NiFi Python Extensions

Overall, the development process was straightforward, and the Python processors were easy to deploy and use within NiFi. The ability to use Python's rich ecosystem of libraries and tools within NiFi is a game-changer, especially for complex data manipulation, machine learning, and AI tasks. This new capability not only expands NiFi's versatility but also aligns it more closely with the needs and workflows of today's data professionals.

    NiFiData EngineeringLanguage ModelChatGPT