Skip to content

Commit 8f8b634

Browse files
perf: benchmark zstd GRPC compression (#749)
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent 533be7d commit 8f8b634

File tree

15 files changed

+632
-24
lines changed

15 files changed

+632
-24
lines changed

pbj-core/gradle/modules.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Module to Maven artifact mappings for JPMS
33
io.helidon.common.features.codegen=io.helidon.common.features:helidon-common-features-codegen
4+
com.github.luben.zstd_jni=com.github.luben:zstd-jni

pbj-core/hiero-dependency-versions/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies.constraints {
5454
excludes.add("org.antlr:antlr4")
5555
excludes.add("com.google.protobuf:protoc")
5656
excludes.add("io.grpc:protoc-gen-grpc-java")
57+
excludes.add("com.github.luben:zstd-jni")
5758
}
5859

5960
// Testing only
@@ -77,4 +78,5 @@ dependencies.constraints {
7778
api("io.helidon.webclient:helidon-webclient-http2:$helidon") {
7879
because("io.helidon.webclient.http2")
7980
}
81+
api("com.github.luben:zstd-jni:1.5.5-11") { because("com.github.luben.zstd") }
8082
}

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,24 @@ public class PbjGrpcCall<RequestT, ReplyT> implements GrpcCall<RequestT, ReplyT>
5252
private static final HeaderName GRPC_ENCODING = HeaderNames.createFromLowercase("grpc-encoding");
5353
private static final HeaderName GRPC_ACCEPT_ENCODING = HeaderNames.createFromLowercase("grpc-accept-encoding");
5454

55+
private static final PbjGrpcNetworkBytesInspector NO_OP_NETWORK_BYTES_INSPECTOR =
56+
new PbjGrpcNetworkBytesInspector() {};
57+
58+
private static PbjGrpcNetworkBytesInspector networkBytesInspector = NO_OP_NETWORK_BYTES_INSPECTOR;
59+
60+
/**
61+
* Install a PbjGrpcNetworkBytesInspector, which may be null to reset it to no-op.
62+
* This is an internal API that isn't suited for general-purpose applications.
63+
* See `PbjGrpcNetworkBytesInspector` javadoc for more details.
64+
* This method is not thread-safe and relies on eventual consistency to take effect. So it's best to invoke it early
65+
* in the application startup.
66+
* @param networkBytesInspector a PbjGrpcNetworkBytesInspector instance
67+
*/
68+
public static void setNetworkBytesInspector(PbjGrpcNetworkBytesInspector networkBytesInspector) {
69+
PbjGrpcCall.networkBytesInspector =
70+
networkBytesInspector != null ? networkBytesInspector : NO_OP_NETWORK_BYTES_INSPECTOR;
71+
}
72+
5573
private final PbjGrpcClient grpcClient;
5674
private final Codec<RequestT> requestCodec;
5775
private final Codec<ReplyT> replyCodec;
@@ -123,6 +141,7 @@ public class PbjGrpcCall<RequestT, ReplyT> implements GrpcCall<RequestT, ReplyT>
123141
public void sendRequest(final RequestT request, final boolean endOfStream) {
124142
final Bytes requestBytes = requestCodec.toBytes(request);
125143
final Bytes bytes = GrpcCompression.getCompressor(grpcOutgoingEncoding).compress(requestBytes);
144+
PbjGrpcCall.networkBytesInspector.sent(bytes);
126145
final BufferData bufferData =
127146
BufferData.create(PbjGrpcDatagramReader.PREFIX_LENGTH + Math.toIntExact(bytes.length()));
128147

@@ -202,6 +221,7 @@ private void receiveRepliesLoop() {
202221
BufferData data = datagram.data();
203222
final byte[] array = data.readBytes();
204223
final Bytes bytes = Bytes.wrap(array);
224+
PbjGrpcCall.networkBytesInspector.received(bytes);
205225
// If the compressedFlag is 0, then per the specification, the message isn't compressed
206226
// regardless of the grpc-encoding value:
207227
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.pbj.grpc.client.helidon;
3+
4+
import com.hedera.pbj.runtime.io.buffer.Bytes;
5+
6+
/**
7+
* An interface for inspecting raw bytes of data payloads sent/received over GRPC.
8+
* <p>
9+
* The bytes are encoded/compressed if the GRPC encoding isn't `identity`. The bytes only represent the data payload
10+
* and don't include any system, non-user bytes, such as GRPC datagram size/compression flag or any headers sent
11+
* over the GRPC/HTTP2 connection.
12+
* <p>
13+
* The primary purpose of this interface is to help estimate the amount of user data being transferred over
14+
* a network connection during GRPC interactions. Currently, this interface doesn't allow one to associate
15+
* a particular Bytes object with a specific GRPC request. In the future, this interface may be extended or
16+
* even modified to allow for such association. Therefore, before using this interface, application developers
17+
* should very carefully consider if they're ready to keep up with the interface changes and update their
18+
* applications accordingly.
19+
* <p>
20+
* It is STRONGLY RECOMMENDED to consider this interface being a PBJ internal implementation detail that is
21+
* not generally suited for use in regular applications.
22+
*/
23+
public interface PbjGrpcNetworkBytesInspector {
24+
/**
25+
* Inspect bytes to be sent to a remote peer, after compressing them and prior to actually sending them.
26+
* @param bytes the bytes sent, potentially compressed.
27+
*/
28+
default void sent(Bytes bytes) {}
29+
30+
/**
31+
* Inspect bytes just received from a remote peer, prior to decompressing them and passing them to the application.
32+
* @param bytes the bytes sent, potentially compressed.
33+
*/
34+
default void received(Bytes bytes) {}
35+
}

pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/GrpcCompression.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.io.ByteArrayOutputStream;
88
import java.io.IOException;
99
import java.io.UncheckedIOException;
10+
import java.util.Collections;
11+
import java.util.HashMap;
1012
import java.util.List;
1113
import java.util.Map;
1214
import java.util.Set;
@@ -84,16 +86,30 @@ public Bytes decompress(Bytes bytes) {
8486
}
8587
}
8688

87-
private static final Map<String, Compressor> COMPRESSOR_MAP = Map.of(
88-
IdentityGrpcTransformer.NAME, IdentityGrpcTransformer.INSTANCE,
89-
GzipGrpcTransformer.NAME, GzipGrpcTransformer.INSTANCE);
90-
private static final Map<String, Decompressor> DECOMPRESSOR_MAP = Map.of(
91-
IdentityGrpcTransformer.NAME, IdentityGrpcTransformer.INSTANCE,
92-
GzipGrpcTransformer.NAME, GzipGrpcTransformer.INSTANCE);
89+
private static final Map<String, Compressor> COMPRESSOR_MAP = new HashMap<>();
90+
private static final Map<String, Decompressor> DECOMPRESSOR_MAP = new HashMap<>();
91+
92+
/** Register a Compressor, potentially overwriting an existing registration for `name`. */
93+
public static synchronized void registerCompressor(String name, Compressor compressor) {
94+
COMPRESSOR_MAP.put(name, compressor);
95+
}
96+
97+
/** Register a Decompressor, potentially overwriting an existing registration for `name`. */
98+
public static synchronized void registerDecompressor(String name, Decompressor decompressor) {
99+
DECOMPRESSOR_MAP.put(name, decompressor);
100+
}
101+
102+
static {
103+
registerCompressor(IdentityGrpcTransformer.NAME, IdentityGrpcTransformer.INSTANCE);
104+
registerCompressor(GzipGrpcTransformer.NAME, GzipGrpcTransformer.INSTANCE);
105+
106+
registerDecompressor(IdentityGrpcTransformer.NAME, IdentityGrpcTransformer.INSTANCE);
107+
registerDecompressor(GzipGrpcTransformer.NAME, GzipGrpcTransformer.INSTANCE);
108+
}
93109

94110
/** Return names of all known compressors. */
95111
public static Set<String> getCompressorNames() {
96-
return COMPRESSOR_MAP.keySet();
112+
return Collections.unmodifiableSet(COMPRESSOR_MAP.keySet());
97113
}
98114

99115
/** Return a known Compressor by its name, or null if unknown. */
@@ -103,7 +119,7 @@ public static Compressor getCompressor(String name) {
103119

104120
/** Return names of all known decompressors. */
105121
public static Set<String> getDecompressorNames() {
106-
return DECOMPRESSOR_MAP.keySet();
122+
return Collections.unmodifiableSet(DECOMPRESSOR_MAP.keySet());
107123
}
108124

109125
/** Return a known Decompressor by its name, or null if unknown. */
@@ -129,7 +145,7 @@ public static Decompressor determineDecompressor(List<String> encodingList) {
129145
} else {
130146
throw new IllegalStateException(
131147
"GRPC peer didn't provide grpc-encoding header and 'identity' is unsupported, only the following are supported: "
132-
+ GrpcCompression.getDecompressorNames());
148+
+ DECOMPRESSOR_MAP.keySet());
133149
}
134150
} else if (encodingList.size() > 1) {
135151
throw new IllegalStateException("GRPC peer specified multiple encodings at once: " + encodingList);
@@ -142,7 +158,7 @@ public static Decompressor determineDecompressor(List<String> encodingList) {
142158
return GrpcCompression.getDecompressor(encoding);
143159
} else {
144160
throw new IllegalStateException("GRPC peer uses an unsupported encoding: '" + encoding
145-
+ "' while only the following are supported: " + GrpcCompression.getDecompressorNames());
161+
+ "' while only the following are supported: " + DECOMPRESSOR_MAP.keySet());
146162
}
147163
}
148164
}
@@ -155,8 +171,8 @@ public static Decompressor determineDecompressor(List<String> encodingList) {
155171
*/
156172
public static String determineCompressorName(List<String> acceptEncoding, String encoding) {
157173
final List<String> supportedAcceptEncodings = acceptEncoding.stream()
158-
.filter(ae -> GrpcCompression.getCompressorNames().stream()
159-
.anyMatch(sae -> ae.equals(sae) || ae.startsWith(sae + ";")))
174+
.filter(ae ->
175+
COMPRESSOR_MAP.keySet().stream().anyMatch(sae -> ae.equals(sae) || ae.startsWith(sae + ";")))
160176
.toList();
161177

162178
if (supportedAcceptEncodings.isEmpty()) {

pbj-integration-tests/build.gradle.kts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ jmhModuleInfo {
5656
requires("com.google.protobuf.util")
5757
requires("io.helidon.common")
5858
requires("io.helidon.webserver")
59+
requires("com.github.luben.zstd_jni")
5960
}
6061

6162
// version is added to module-info.class files
@@ -69,7 +70,10 @@ configurations.testRuntimeClasspath {
6970
}
7071

7172
// IMPROVE: Test code should not have a direct dependency to 'com.hedera.pbj.compiler'
72-
dependencies { testImplementation("com.hedera.pbj:pbj-compiler") { isTransitive = false } }
73+
dependencies {
74+
jmhImplementation("com.github.luben:zstd-jni")
75+
testImplementation("com.hedera.pbj:pbj-compiler") { isTransitive = false }
76+
}
7377

7478
dependencyAnalysis { issues { all { onAny { exclude("com.hedera.pbj:pbj-compiler") } } } }
7579

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/GreeterService.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import edu.umd.cs.findbugs.annotations.NonNull;
66
import java.util.ArrayList;
77
import java.util.List;
8+
import java.util.Random;
89
import java.util.concurrent.Flow;
910
import java.util.concurrent.atomic.AtomicInteger;
1011
import java.util.stream.Collectors;
@@ -19,6 +20,31 @@ public class GreeterService implements GreeterInterface {
1920
static final HelloRequest EMPTY_REQUEST = new HelloRequest("");
2021
static final HelloReply EMPTY_REPLY = new HelloReply("");
2122

23+
// For generating random payload strings. Note that benchmarks run in multiple threads,
24+
// so it's impossible to make them deterministic (or it'd be too very slow.)
25+
static final Random RANDOM = new Random();
26+
27+
/**
28+
* Generate half-random/half-repeated string to allow compressors to compress something.
29+
* @param length total length of the string, should be even.
30+
* @return a semi-random string
31+
*/
32+
static String generateHalfRandomString(int length) {
33+
StringBuilder sb = new StringBuilder(length);
34+
sb.append("a".repeat(length / 2));
35+
for (int i = length / 2; i < length; i++) {
36+
sb.append((char) (32 + RANDOM.nextInt(127 - 32)));
37+
}
38+
return sb.toString();
39+
}
40+
41+
// We want our benchmark to measure the GRPC implementation performance. We don't want to include the string
42+
// generation code into the time it takes to process a request. So let's precompute the strings here:
43+
static final String NORMAL_PAYLOAD_STRING = generateHalfRandomString(256);
44+
static final String HEAVY_PAYLOAD_STRING = generateHalfRandomString(8192);
45+
// PBJ DEFAULT_MAX_SIZE is 2MB. See https://github.com/hashgraph/pbj/issues/748
46+
static final String SUPER_PAYLOAD_STRING = generateHalfRandomString(2000 * 1024);
47+
2248
private final PayloadWeight weight;
2349
private final int streamCount;
2450

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.pbj.integration.jmh.grpc;
3+
4+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcCall;
5+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcNetworkBytesInspector;
6+
import com.hedera.pbj.runtime.io.buffer.Bytes;
7+
8+
/**
9+
* Simulate network latency based on the actual amount of user data being sent/received over a network.
10+
* Note: with 1Gbps, we get 8ns per byte. So to test higher speeds, we should switch to floating point math.
11+
* A very fast network would make any compression look bad because we'll waste CPU time on the compression
12+
* instead of leveraging the fast network.
13+
* A slow network would show that compression may be useful sometimes. Specifically, for larger, compressible
14+
* payloads. Smaller payloads (<8K) never benefit from compression.
15+
* <p>
16+
* The network latency simulator uses the PbjGrpcNetworkBytesInspector feature of the PbjGrpcCall, which is
17+
* a static entity, meaning that it affects any and all PbjGrpcCall objects in the same JVM. This is by design
18+
* for performance reasons. However, users of this Simulator should be aware of this and not try to use it
19+
* from multiple threads with different latency parameters at once.
20+
*/
21+
public class NetworkLatencySimulator {
22+
// ms 1e-3, us 1e-6, ns 1e-9:
23+
private static final long NANOS_IN_MILLI = 1_000_000L;
24+
25+
/**
26+
* Install the NetworkLatencySimulator as a PbjGrpcNetworkBytesInspector in PbjGrpcCall.
27+
* @param networkSpeedMbitPerSecond the speed in Mbps, e.g. 1_000 for 1Gbps network
28+
* @param printSizes if true, print a few sent/received sizes for debugging/additional information
29+
*/
30+
public static void simulate(final long networkSpeedMbitPerSecond, final boolean printSizes) {
31+
// mbit->mbyte = /8:
32+
final long nanosPerByte = 1_000_000_000L * 8 / (networkSpeedMbitPerSecond * 1_000_000L);
33+
PbjGrpcCall.setNetworkBytesInspector(new PbjGrpcNetworkBytesInspector() {
34+
// max number of sizes to print if enabled:
35+
int counter = 4;
36+
37+
private void sleep(long bytes) {
38+
final long nanos = nanosPerByte * bytes;
39+
final long deadline = System.nanoTime() + nanos;
40+
while (System.nanoTime() < deadline) {
41+
Thread.onSpinWait();
42+
}
43+
}
44+
45+
@Override
46+
public void sent(Bytes bytes) {
47+
if (printSizes && counter-- >= 0) {
48+
System.err.println("sent: " + bytes.length() + " bytes");
49+
}
50+
sleep(bytes.length());
51+
}
52+
53+
@Override
54+
public void received(Bytes bytes) {
55+
if (printSizes && counter-- >= 0) {
56+
System.err.println("received: " + bytes.length() + " bytes");
57+
}
58+
sleep(bytes.length());
59+
}
60+
});
61+
}
62+
}

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/PayloadWeight.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
*/
1212
public enum PayloadWeight {
1313
LIGHT(() -> GreeterService.EMPTY_REQUEST, request -> GreeterService.EMPTY_REPLY),
14-
NORMAL(() -> new HelloRequest("a".repeat(256)), request -> new HelloReply(request.name())),
15-
HEAVY(() -> new HelloRequest("a".repeat(8192)), request -> new HelloReply(request.name()));
14+
NORMAL(() -> new HelloRequest(GreeterService.NORMAL_PAYLOAD_STRING), request -> new HelloReply(request.name())),
15+
HEAVY(() -> new HelloRequest(GreeterService.HEAVY_PAYLOAD_STRING), request -> new HelloReply(request.name())),
16+
SUPER(() -> new HelloRequest(GreeterService.SUPER_PAYLOAD_STRING), request -> new HelloReply(request.name()));
1617

1718
public final Supplier<HelloRequest> requestSupplier;
1819
public final Function<HelloRequest, HelloReply> replyProvider;

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/PbjGrpcBench.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.hedera.pbj.grpc.helidon.PbjGrpcServiceConfig;
55
import com.hedera.pbj.grpc.helidon.PbjRouting;
6+
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
67
import com.hedera.pbj.integration.grpc.GrpcTestUtils;
78
import com.hedera.pbj.integration.grpc.PortsAllocator;
89
import com.hedera.pbj.runtime.Codec;
@@ -50,25 +51,39 @@
5051
@SuppressWarnings("unused")
5152
@State(Scope.Benchmark)
5253
@Fork(1)
53-
@Warmup(iterations = 2)
54-
@Measurement(iterations = 5)
54+
@Warmup(iterations = 1)
55+
@Measurement(iterations = 3)
5556
@OutputTimeUnit(TimeUnit.SECONDS)
5657
@BenchmarkMode(Mode.Throughput)
5758
public class PbjGrpcBench {
58-
private static final int INVOCATIONS = 20_000;
59+
private static final int INVOCATIONS = 2_000;
5960

60-
private record ServerHandle(WebServer server) implements AutoCloseable {
61+
static {
62+
new ZstdGrpcTransformer().register("zstd");
63+
64+
// 1Gbps network:
65+
NetworkLatencySimulator.simulate(1_000, false);
66+
}
67+
68+
public record ServerHandle(WebServer server) implements AutoCloseable {
6169
@Override
6270
public void close() {
6371
server.stop();
6472
}
6573

66-
static ServerHandle start(
74+
public static ServerHandle start(
6775
final int port, final ServiceInterface service, final PbjGrpcServiceConfig serviceConfig) {
76+
final int maxPayloadSize = 20 * 1024 * 1024;
77+
final PbjConfig pbjConfig = PbjConfig.builder()
78+
.name("pbj")
79+
.maxMessageSizeBytes(maxPayloadSize)
80+
.build();
81+
;
6882
return new ServerHandle(WebServer.builder()
6983
.port(port)
84+
.addProtocol(pbjConfig)
7085
.addRouting(PbjRouting.builder().service(service, serviceConfig))
71-
.maxPayloadSize(10000)
86+
.maxPayloadSize(maxPayloadSize)
7287
.build()
7388
.start());
7489
}
@@ -96,7 +111,7 @@ public static class UnaryState {
96111
@Param
97112
PayloadWeight weight;
98113

99-
@Param({"identity", "gzip"})
114+
@Param({"identity", "gzip", "zstd"})
100115
String encodings;
101116

102117
PortsAllocator.Port port;
@@ -143,10 +158,13 @@ public void tearDown() {
143158
// There's code duplicated from UnaryState above. It's because JMH is having troubles when states use inheritance.
144159
@State(Scope.Thread)
145160
public static class StreamingState {
146-
@Param
161+
// Skip the SUPER weight as it would be too slow.
162+
@Param({"LIGHT", "NORMAL", "HEAVY"})
147163
PayloadWeight weight;
148164

149-
@Param({"identity", "gzip"})
165+
// Streaming benchmarks are much slower than unary. Also, compression benchmarks are much slower than identity.
166+
// So for streaming, only test the identity:
167+
@Param({"identity"})
150168
String encodings;
151169

152170
@Param({"1", "10"})

0 commit comments

Comments
 (0)