From 66a7c321a910a4c3b1a940d0e5b778cd73239e3d Mon Sep 17 00:00:00 2001 From: Thomas Bonfert Date: Tue, 23 Feb 2021 15:46:51 +0100 Subject: [PATCH] handling of null objects for flow files returned from the session object --- .../com/github/whiver/nifi/processor/ProtobufDecoder.java | 5 +++++ .../com/github/whiver/nifi/processor/ProtobufEncoder.java | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java b/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java index 701d175..f9043e4 100644 --- a/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java +++ b/src/main/java/com/github/whiver/nifi/processor/ProtobufDecoder.java @@ -58,6 +58,11 @@ public void onTrigger(ProcessContext processContext, ProcessSession session) thr final FlowFile flowfile = session.get(); + //it could be that another thread already got the flowfile -> Leave the function + if(flowfile == null) { + return; + } + String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName()); boolean compileSchema = processContext.getProperty(COMPILE_SCHEMA.getName()).asBoolean(); String messageType = flowfile.getAttribute("protobuf.messageType"); diff --git a/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java b/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java index ff2ef93..78bdab7 100644 --- a/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java +++ b/src/main/java/com/github/whiver/nifi/processor/ProtobufEncoder.java @@ -54,6 +54,10 @@ public void onTrigger(ProcessContext processContext, ProcessSession session) thr final AtomicReference error = new AtomicReference<>(); final FlowFile flowfile = session.get(); + //it could be that another thread already got the flowfile -> Leave the function + if(flowfile == null) { + return; + } // We check if the protobuf.schemaPath property is defined in the flowfile String protobufSchema = flowfile.getAttribute(PROTOBUF_SCHEMA.getName());