Skip to content

Commit 072dbd6

Browse files
Add small changes for better Performance
* Cache addresses to avoid repeated lookups. * Presize HashMap to avoid rehashing during record grouping * Check queue size on flushes first and avoid creation of Duration objects * Use ArrayList for better cache locality. These changes were measure with the JMH benchmark to lead to significant improvements. They were especially high on small record numbers with small batches. But even on 5000 records cases the provided 30-60% high throughput. Signed-off-by: Karsten Schnitter <[email protected]>
1 parent eb21783 commit 072dbd6

File tree

3 files changed

+32
-22
lines changed

3 files changed

+32
-22
lines changed

data-prepper-core/src/jmh/java/org/opensearch/dataprepper/core/peerforwarder/RemotePeerForwarderBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class RemotePeerForwarderBenchmark {
5252
private static final int BATCH_DELAY = 100;
5353
private static final int FAILED_FORWARDING_REQUEST_LOCAL_WRITE_TIMEOUT = 100;
5454
private static final int FORWARDING_BATCH_SIZE = BATCH_SIZE;
55-
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 3;
55+
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 25;
5656
private static final Duration FORWARDING_BATCH_TIMEOUT = Duration.ofMillis(800);
5757
private static final int PIPELINE_WORKER_THREADS = 8;
5858
private static final int HASH_RING_VIRTUAL_NODES = 128;

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/HashRing.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class HashRing implements Consumer<List<Endpoint>> {
3838
private final PeerListProvider peerListProvider;
3939

4040
private TreeMap<BigInteger, String> hashServerMap = new TreeMap<>();
41+
private int peerCount = 0;
4142

4243
public HashRing(final PeerListProvider peerListProvider, final int numVirtualNodes) {
4344
Objects.requireNonNull(peerListProvider);
@@ -77,6 +78,10 @@ public Optional<String> getServerIp(final List<String> identificationKeyValues)
7778
}
7879
}
7980

81+
public int getPeerCount() {
82+
return peerCount;
83+
}
84+
8085
@Override
8186
public void accept(final List<Endpoint> endpoints) {
8287
buildHashServerMap();
@@ -92,6 +97,7 @@ private void buildHashServerMap() {
9297
}
9398

9499
this.hashServerMap = newHashValueMap;
100+
this.peerCount = endpoints.size();
95101
}
96102

97103
private void addServerIpToHashMap(final String serverIp, final Map<BigInteger, String> targetMap) {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/RemotePeerForwarder.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@
2222
import java.net.SocketException;
2323
import java.net.UnknownHostException;
2424
import java.time.Duration;
25-
import java.time.temporal.ChronoUnit;
2625
import java.util.ArrayList;
2726
import java.util.Collection;
2827
import java.util.Collections;
2928
import java.util.HashMap;
30-
import java.util.LinkedList;
3129
import java.util.List;
3230
import java.util.Map;
3331
import java.util.Set;
@@ -56,6 +54,7 @@ class RemotePeerForwarder implements PeerForwarder {
5654
private final Set<String> identificationKeys;
5755
final ConcurrentHashMap<String, LinkedBlockingQueue<Record<Event>>> peerBatchingQueueMap;
5856
private final ConcurrentHashMap<String, Long> peerBatchingLastFlushTimeMap;
57+
private final ConcurrentHashMap<String, Boolean> localAddressCache;
5958

6059
private final Counter recordsActuallyProcessedLocallyCounter;
6160
private final Counter recordsToBeProcessedLocallyCounter;
@@ -99,6 +98,7 @@ class RemotePeerForwarder implements PeerForwarder {
9998
this.pipelineWorkerThreads = pipelineWorkerThreads;
10099
peerBatchingQueueMap = new ConcurrentHashMap<>();
101100
peerBatchingLastFlushTimeMap = new ConcurrentHashMap<>();
101+
localAddressCache = new ConcurrentHashMap<>();
102102

103103
recordsActuallyProcessedLocallyCounter = pluginMetrics.counter(RECORDS_ACTUALLY_PROCESSED_LOCALLY);
104104
recordsToBeProcessedLocallyCounter = pluginMetrics.counter(RECORDS_TO_BE_PROCESSED_LOCALLY);
@@ -149,13 +149,13 @@ private Map<String, List<Record<Event>>> groupRecordsBasedOnIdentificationKeys(
149149
final Collection<Record<Event>> records,
150150
final Set<String> identificationKeys
151151
) {
152-
final Map<String, List<Record<Event>>> groupedRecords = new HashMap<>();
152+
final Map<String, List<Record<Event>>> groupedRecords = new HashMap<>(hashRing.getPeerCount());
153153

154154
// group records based on IP address calculated by HashRing
155155
for (final Record<Event> record : records) {
156156
final Event event = record.getData();
157157

158-
final List<String> identificationKeyValues = new LinkedList<>();
158+
final List<String> identificationKeyValues = new ArrayList<>(identificationKeys.size());
159159
int numMissingIdentificationKeys = 0;
160160
for (final String identificationKey : identificationKeys) {
161161
final Object identificationKeyValue = event.get(identificationKey, Object.class);
@@ -178,21 +178,23 @@ private Map<String, List<Record<Event>>> groupRecordsBasedOnIdentificationKeys(
178178
}
179179

180180
private boolean isAddressDefinedLocally(final String address) {
181-
final InetAddress inetAddress;
182-
try {
183-
inetAddress = InetAddress.getByName(address);
184-
} catch (final UnknownHostException e) {
185-
return false;
186-
}
187-
if (inetAddress.isAnyLocalAddress() || inetAddress.isLoopbackAddress()) {
188-
return true;
189-
} else {
181+
return localAddressCache.computeIfAbsent(address, addr -> {
182+
final InetAddress inetAddress;
190183
try {
191-
return NetworkInterface.getByInetAddress(inetAddress) != null;
192-
} catch (final SocketException e) {
184+
inetAddress = InetAddress.getByName(addr);
185+
} catch (final UnknownHostException e) {
193186
return false;
194187
}
195-
}
188+
if (inetAddress.isAnyLocalAddress() || inetAddress.isLoopbackAddress()) {
189+
return true;
190+
} else {
191+
try {
192+
return NetworkInterface.getByInetAddress(inetAddress) != null;
193+
} catch (final SocketException e) {
194+
return false;
195+
}
196+
}
197+
});
196198
}
197199

198200
private List<Record<Event>> batchRecordsForForwarding(final String destinationIp, final List<Record<Event>> records) {
@@ -294,12 +296,14 @@ private List<Record<Event>> getRecordsToForward(final String destinationIp) {
294296
}
295297

296298
private boolean shouldFlushBatch(final String destinationIp) {
297-
final long currentTime = System.currentTimeMillis();
298-
final long millisSinceLastFlush = currentTime - peerBatchingLastFlushTimeMap.getOrDefault(destinationIp, System.currentTimeMillis());
299-
final Duration durationSinceLastFlush = Duration.of(millisSinceLastFlush, ChronoUnit.MILLIS);
299+
final LinkedBlockingQueue<Record<Event>> queue = peerBatchingQueueMap.get(destinationIp);
300+
if (queue.size() >= forwardingBatchSize) {
301+
return true;
302+
}
300303

301-
final boolean shouldFlushDueToTimeout = durationSinceLastFlush.compareTo(forwardingBatchTimeout) >= 0;
302-
return shouldFlushDueToTimeout || peerBatchingQueueMap.get(destinationIp).size() >= forwardingBatchSize;
304+
final long currentTime = System.currentTimeMillis();
305+
final long millisSinceLastFlush = currentTime - peerBatchingLastFlushTimeMap.getOrDefault(destinationIp, currentTime);
306+
return millisSinceLastFlush >= forwardingBatchTimeout.toMillis();
303307
}
304308

305309
void processFailedRequestsLocally(final AggregatedHttpResponse httpResponse, final Collection<Record<Event>> records) {

0 commit comments

Comments
 (0)