Skip to content

NIFI-14472: Fix potential NPE in PutKinesisFirehose batch accumulation#11211

Open
rakesh-rsky wants to merge 3 commits into
apache:mainfrom
rakesh-rsky:fix/NIFI-14472-kinesis-firehose-npe
Open

NIFI-14472: Fix potential NPE in PutKinesisFirehose batch accumulation#11211
rakesh-rsky wants to merge 3 commits into
apache:mainfrom
rakesh-rsky:fix/NIFI-14472-kinesis-firehose-npe

Conversation

@rakesh-rsky
Copy link
Copy Markdown
Contributor

Summary

Fixes a race condition / NPE in PutKinesisFirehose.onTrigger() where a two-step computeIfAbsent() + get() on the batch-accumulation map could return
ull if the entry was evicted between the two calls.

Changes

  • Collapsed the two-step lookup into a single atomic computeIfAbsent(...).add(record) call in PutKinesisFirehose.java, eliminating the window for a NullPointerException.

Testing

  • Existing unit tests pass.
  • The fix is a straightforward atomic-operation replacement with no behavioral change under normal conditions.

Fixes: https://issues.apache.org/jira/browse/NIFI-14472

…eam name evaluates to null

The onTrigger() method used a two-step pattern to populate the recordHash map:
  1. recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>())
  2. recordHash.get(firehoseStreamName).add(record)   // ← NPE here

When the KINESIS_FIREHOSE_DELIVERY_STREAM_NAME expression evaluates to null
(e.g., the referenced FlowFile attribute is absent), computeIfAbsent(null, ...)
does not insert an entry into the map for null keys in all JVM implementations,
causing the subsequent get(null) to return null and .add() to throw:
  NullPointerException: Cannot invoke List.add() because Map.get() returns null

- Collapsed the two-step lookup into a single atomic computeIfAbsent().add()
  call, eliminating the null-return window between the two statements
- The fix applies to the recordHash population step; hashFlowFiles already
  used computeIfAbsent correctly in a single step on the following line

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-14472-kinesis-firehose-npe branch from 36af390 to 67c2d4c Compare May 6, 2026 11:09
…n macOS

The test class was missing an @AfterEach to shut down the provider after each
test. Tests that called provider.initialize() but not provider.shutdown() left
the Kubernetes client's Netty event loop open. On macOS with Zulu JDK 21, the
GC collected the event loop before KubernetesMockServerExtension.afterEach()
ran, causing MockWebServer.await() to throw RejectedExecutionException:
  'event executor terminated'

Fix: add @AfterEach shutdownProvider() to guarantee the Kubernetes client is
closed before the mock server extension tears down, ensuring deterministic
cleanup across all platforms and JVM implementations.

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
…pStateProvider.shutdown()

shutdown() was calling kubernetesClient.close() unconditionally. If the
provider was never initialized (e.g. in unit tests that only call
getSupportedScopes()), kubernetesClient is null and a NullPointerException
is thrown from the @AfterEach teardown.

Also null-guard the logger reference for the same reason.
@exceptionfactory
Copy link
Copy Markdown
Contributor

@rakesh-rsky It looks like the last update to this branch brought in a commit instead of rebasing, can you rebase your branch against main and force push to align with the current baseline?

Copy link
Copy Markdown
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this problem @rakesh-rsky. The core functional change appears to be correct, but the comments are unnecessary and the implementation line should be broken out into several lines for readability. If you can make that change, remove the extra commit, and rebase against the current main branch, this should be good to go.

Comment on lines +137 to +143
// Use a single computeIfAbsent().add() call so the list lookup is atomic.
// The previous two-step pattern (computeIfAbsent then a separate get()) could
// throw NullPointerException when firehoseStreamName evaluates to null via
// expression language, because get(null) returned null after the absent-key
// entry was never actually inserted. (NIFI-14472)
session.read(flowFile, in -> recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>())
.add(Record.builder().data(SdkBytes.fromInputStream(in)).build()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting on the previous behavior is unnecessary and should be removed. In addition, it would be helpful to separate the callback implementation across multiple lines for readability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants