Skip to content

Conversation

@KarstenSchnitter
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter commented Jan 7, 2026

Description

I am looking into improving the performance of the RemotePeerForwarder. I introduced a JMH benchmark to measure improvements. This lead to a first set of changes to improve throughput especially on smaller batch size. The next change removes synchronization during forwarding of the requests. Finally, I improved the receiving of requests by introducing a heuristic to abandon the batchDelay when enough data is expected in the queue. Major performance gains achieved by this PR:

  • caching of local IPs reduces latencies on forwarding of small batches
  • asynchronous forwarding removes synchronization between different peers
  • reading from a (expectedly) full buffer is done with minimum delay to avoid capping throughput by batchDelay

Issues Resolved

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@KarstenSchnitter KarstenSchnitter force-pushed the peerforwarder-performance branch from 1b233b4 to 072dbd6 Compare January 7, 2026 08:41
Provides an JMH benchmark for the RemotePeerForwarder to enable and verify
improvements to the implementation.

Signed-off-by: Karsten Schnitter <[email protected]>
* Cache addresses to avoid repeated lookups.
* Presize HashMap to avoid rehashing during record grouping
* Check queue size on flushes first and avoid creation of Duration objects
* Use ArrayList for better cache locality.

These changes were measure with the JMH benchmark to lead to significant
improvements. They were especially high on small record numbers with small
batches. But even on 5000 records cases the provided 30-60% high throughput.

Signed-off-by: Karsten Schnitter <[email protected]>
@KarstenSchnitter KarstenSchnitter force-pushed the peerforwarder-performance branch from 072dbd6 to 8d37a39 Compare January 7, 2026 09:19
The RemotePeerForwarder used to wait for all network requests to finish in each
`forwardRecords()` call. This new implementation removes the synchronization
and handles the responses asynchronously. This improves performance in the
improved benchmarks by a factor of 5-10.

The benchmark was changed to include different values of network latency. This
allows to determine the impact of the asynchronous forwarding.

Signed-off-by: Karsten Schnitter <[email protected]>
When there is enough data available the default 100ms batchDelay leads to a hard-cap of
50 ops/s for receiving records. The new implementation introduces a heuristic, that guesses
whether there is data available based on the last read. When it expects data it tries for a
synchronous read without delay, which uses 5ms within the PeerForwarderReceiveBuffer.
This allows more operations per second in this scenario without limitations by the configured
or default batchDelay.

The criterium to decide for using the fast reads is that at least half the forwarding batch size
was received in the last attempt. It was determined, that there is little benefit in changing this
limit compared to having it at all. Therefore, this number was hard-coded.

Signed-off-by: Karsten Schnitter <[email protected]>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @KarstenSchnitter for these improvements! I'm excited to see the results of this. A few high-level questions:

  • Do you have benchmark results you can share from before the changes and after?
  • Have you tested this with real Data Prepper nodes? If so, what are your findings.

Also, we should probably have some unit tests for these behaviors.

try {
return NetworkInterface.getByInetAddress(inetAddress) != null;
} catch (final SocketException e) {
inetAddress = InetAddress.getByName(addr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it that this call uses Java's DNS cache. Why do you think caching it helps so much?

// Heuristic: expect substantial batches only if we're receiving more than 50% of forwarding batch size.
// - When true: use non-blocking reads (0ms) to maximize throughput during high-load periods
// - When false: use configured batchDelay to accumulate records and prevent fragmentation
final int minBatchSizeForNonBlocking = forwardingBatchSize / 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this percentage configurable in data-prepper-config.yaml?

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(batchDelay);
// Adaptive timeout: use non-blocking (0ms) or configured delay based on expected batch size
final int timeout = expectSubstantialBatch ? 0 : batchDelay;
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(timeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the buffer is already full, does it not immediately return? What case do you think is making this take a long time when there are a lots of requests?

peerForwarderClient.serializeRecordsAndSendHttpRequest(currentBatch, destinationIp, pluginId, pipelineName);

// Process response asynchronously without blocking
responseFuture
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there is any issue with this possibly making too many requests? Is there something in Armeria or CompletableFuture that will limit the total requests?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants