Python Script in NiFi

Short Description:

A brief introduction to using Python inside NiFi to manipulate FlowFiles in stream using python.​

Article

In NiFi the data being passed between operators is referred to as a FlowFile and can be accessed via various scripting languages in the ExecuteScript operator.

In order to access the data in the FlowFile you need to understand a few requirements first.

In this example we will access the json data being passed into the ExecuteScript operator via a getTwitter operator.

This could be any data ingestion operator.

Using the ExecuteScript operator set the script type to python and in the “script body” section enter the following code:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Source": "NiFi",
          "ID": obj['id'],
          "Name": obj['user']['screen_name']
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
 
flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

 

the import statements are required to take advantage of the NiFi components.

The way we actually access the flowfile is through a global variable available to you in NiFi called “session”.

As you can see:

flowFile = session.get()

is how we grab the IOStream from NiFi and the pass to the processing Class via the session.write() method.

You can build up a new JSON object while referencing attributes from the source JSON as you can see in the script:

    newObj = {
          "Source": "NiFi",
          "ID": obj['id'],
          "Name": obj['user']['screen_name']
        }

 

This should help you get started with using python scripts inside of NiFi.

I hope to see some posts of how you modify this to create more interesting flows.

LEAVE A COMMENT