Skip to content

Commit eb21783

Browse files
Create JMH for RemotePeerForwarder
Provides an JMH benchmark for the RemotePeerForwarder to enable and verify improvements to the implementation. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
1 parent 45b0e86 commit eb21783

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed

data-prepper-core/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
plugins {
77
id 'data-prepper.publish'
8+
id 'data-prepper.jmh'
89
}
910

1011
def dataPrepperVersion = version
@@ -125,6 +126,10 @@ task integrationTest(type: Test) {
125126

126127
check.dependsOn integrationTest
127128

129+
jmhJar {
130+
zip64 = true
131+
}
132+
128133
jacocoTestCoverageVerification {
129134
dependsOn jacocoTestReport
130135
violationRules {
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package org.opensearch.dataprepper.core.peerforwarder;
2+
3+
import com.linecorp.armeria.client.Endpoint;
4+
import com.linecorp.armeria.common.AggregatedHttpResponse;
5+
import com.linecorp.armeria.common.HttpStatus;
6+
import io.micrometer.core.instrument.Counter;
7+
import org.openjdk.jmh.annotations.Benchmark;
8+
import org.openjdk.jmh.annotations.BenchmarkMode;
9+
import org.openjdk.jmh.annotations.Fork;
10+
import org.openjdk.jmh.annotations.Level;
11+
import org.openjdk.jmh.annotations.Measurement;
12+
import org.openjdk.jmh.annotations.Mode;
13+
import org.openjdk.jmh.annotations.OutputTimeUnit;
14+
import org.openjdk.jmh.annotations.Param;
15+
import org.openjdk.jmh.annotations.Scope;
16+
import org.openjdk.jmh.annotations.Setup;
17+
import org.openjdk.jmh.annotations.State;
18+
import org.openjdk.jmh.annotations.Warmup;
19+
import org.opensearch.dataprepper.core.peerforwarder.client.PeerForwarderClient;
20+
import org.opensearch.dataprepper.core.peerforwarder.discovery.PeerListProvider;
21+
import org.opensearch.dataprepper.metrics.PluginMetrics;
22+
import org.opensearch.dataprepper.model.event.Event;
23+
import org.opensearch.dataprepper.model.log.JacksonLog;
24+
import org.opensearch.dataprepper.model.record.Record;
25+
26+
import java.time.Duration;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.function.Consumer;
36+
37+
import static org.mockito.ArgumentMatchers.anyCollection;
38+
import static org.mockito.ArgumentMatchers.anyString;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.when;
41+
42+
@State(Scope.Benchmark)
43+
@BenchmarkMode(Mode.Throughput)
44+
@OutputTimeUnit(TimeUnit.SECONDS)
45+
@Warmup(iterations = 3, time = 5)
46+
@Measurement(iterations = 5, time = 10)
47+
@Fork(1)
48+
public class RemotePeerForwarderBenchmark {
49+
50+
private static final int BUFFER_SIZE = 10240;
51+
private static final int BATCH_SIZE = 160;
52+
private static final int BATCH_DELAY = 100;
53+
private static final int FAILED_FORWARDING_REQUEST_LOCAL_WRITE_TIMEOUT = 100;
54+
private static final int FORWARDING_BATCH_SIZE = BATCH_SIZE;
55+
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 3;
56+
private static final Duration FORWARDING_BATCH_TIMEOUT = Duration.ofMillis(800);
57+
private static final int PIPELINE_WORKER_THREADS = 8;
58+
private static final int HASH_RING_VIRTUAL_NODES = 128;
59+
60+
private RemotePeerForwarder peerForwarder;
61+
private Collection<Record<Event>> testRecords;
62+
63+
@Param({"1", "2", "4"})
64+
private int nodeCount;
65+
66+
@Param({"100", "1000", "5000", "50000"})
67+
private int recordCount;
68+
69+
@Setup(Level.Trial)
70+
public void setup() {
71+
PeerForwarderClient mockClient = mock(PeerForwarderClient.class);
72+
when(mockClient.serializeRecordsAndSendHttpRequest(anyCollection(), anyString(), anyString(), anyString()))
73+
.thenReturn(CompletableFuture.completedFuture(
74+
AggregatedHttpResponse.of(HttpStatus.OK)));
75+
PeerListProvider peerListProvider = createPeerListProvider(nodeCount);
76+
HashRing hashRing = new HashRing(peerListProvider, HASH_RING_VIRTUAL_NODES);
77+
78+
PeerForwarderReceiveBuffer<Record<Event>> buffer =
79+
new PeerForwarderReceiveBuffer<>(BUFFER_SIZE, BATCH_SIZE, "test", "test");
80+
81+
PluginMetrics mockPluginMetrics = mock(PluginMetrics.class);
82+
Counter mockCounter = mock(Counter.class);
83+
when(mockPluginMetrics.counter(anyString())).thenReturn(mockCounter);
84+
85+
peerForwarder = new RemotePeerForwarder(
86+
mockClient, hashRing, buffer, "test", "test",
87+
Set.of("key1", "key2"), mockPluginMetrics,
88+
BATCH_DELAY, FAILED_FORWARDING_REQUEST_LOCAL_WRITE_TIMEOUT,
89+
FORWARDING_BATCH_SIZE, FORWARDING_BATCH_QUEUE_DEPTH,
90+
FORWARDING_BATCH_TIMEOUT, PIPELINE_WORKER_THREADS
91+
);
92+
93+
testRecords = generateTestRecords(recordCount, nodeCount);
94+
}
95+
96+
@Benchmark
97+
public Collection<Record<Event>> benchmarkForwardRecords() {
98+
return peerForwarder.forwardRecords(testRecords);
99+
}
100+
101+
private Collection<Record<Event>> generateTestRecords(int count, int nodes) {
102+
List<Record<Event>> records = new ArrayList<>();
103+
for (int i = 0; i < count; i++) {
104+
Map<String, String> data = new HashMap<>();
105+
data.put("key1", "value" + i);
106+
data.put("key2", "value" + (i % 10));
107+
records.add(new Record<>(JacksonLog.builder().withData(data).build()));
108+
}
109+
return records;
110+
}
111+
112+
private PeerListProvider createPeerListProvider(int nodes) {
113+
List<String> ips = new ArrayList<>();
114+
for (int i = 0; i < nodes; i++) {
115+
ips.add(i == 0 ? "127.0.0.1" : "10.0.0." + i);
116+
}
117+
118+
return new PeerListProvider() {
119+
@Override
120+
public List<String> getPeerList() {
121+
return ips;
122+
}
123+
124+
@Override
125+
public void addListener(Consumer<? super List<Endpoint>> listener) {
126+
// No-op for benchmark
127+
}
128+
129+
@Override
130+
public void removeListener(Consumer<?> listener) {
131+
// No-op for benchmark
132+
}
133+
};
134+
}
135+
}

0 commit comments

Comments
 (0)