Skip to content

Commit 5152c9d

Browse files
fix: improve sendPing() in GRPC client (#763)
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent d9850c8 commit 5152c9d

File tree

7 files changed

+372
-7
lines changed

7 files changed

+372
-7
lines changed

pbj-core/hiero-dependency-versions/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ dependencies.constraints {
4545
api("io.helidon.builder:helidon-builder-codegen:$helidon") {
4646
because("io.helidon.builder.codegen")
4747
}
48+
api("io.helidon.logging:helidon-logging-jul:$helidon") { because("io.helidon.logging.jul") }
4849

4950
// Code generation tools
5051
api("org.antlr:antlr4:$antlr")
@@ -55,6 +56,7 @@ dependencies.constraints {
5556
excludes.add("com.google.protobuf:protoc")
5657
excludes.add("io.grpc:protoc-gen-grpc-java")
5758
excludes.add("com.github.luben:zstd-jni")
59+
excludes.add("io.helidon.logging:helidon-logging-jul")
5860
}
5961

6062
// Testing only

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
import io.helidon.http.Headers;
1818
import io.helidon.http.WritableHeaders;
1919
import io.helidon.http.http2.Http2FrameData;
20+
import io.helidon.http.http2.Http2FrameHeader;
2021
import io.helidon.http.http2.Http2Headers;
22+
import io.helidon.http.http2.Http2Ping;
2123
import io.helidon.http.http2.Http2StreamState;
2224
import io.helidon.webclient.http2.Http2ClientStream;
2325
import io.helidon.webclient.http2.StreamTimeoutException;
26+
import java.io.IOException;
27+
import java.io.UncheckedIOException;
2428
import java.util.List;
2529
import java.util.concurrent.ExecutionException;
2630
import java.util.concurrent.TimeUnit;
@@ -165,6 +169,58 @@ private boolean isStreamOpen() {
165169
&& clientStream.streamState() != Http2StreamState.CLOSED;
166170
}
167171

172+
/**
173+
* Send a ping to the server.
174+
* <p>
175+
* Do NOT use Http2ClientStream.sendPing()! It works once. A second ping results in sending garbage frames
176+
* to the server (indirectly), and the server closes the connection. The exact cause is still unknown, but it may
177+
* be related to the usage of this connection's flowControl object for sending the pings which may
178+
* interfere with the regular data transfers occurring via this same connection concurrently with the ping.
179+
* Another reason for this may be the fact that it uses a static HTTP2_PING object for sending pings, but
180+
* it never rewind()'s the buffer that holds the ping payload, so the server may read bytes from a subsequent
181+
* regular data frame and interpret them as the ping payload, which should break the HTTP2 connection as a whole.
182+
* <p>
183+
* There's Http2ClientConnection.ping() method that explicitly uses the FlowControl.Outbound.NOOP for sending
184+
* new ping objects. However, that method is package-private.
185+
* <p>
186+
* So we implement our own sendPing() here that uses new Http2Ping objects and doesn't use the flowControl.
187+
* <p>
188+
* NOTE: Http2ClientStream methods use an Http2ConnectionWriter object via Http2ClientConnection.writer()
189+
* to write data, and it's a wrapper around the ClientConnection's DataWriter object.
190+
* And the Http2ConnectionWriter has some additional synchronization around DataWriter.write() calls.
191+
* However, ironically, it doesn't synchronize access to the flowControl object. Regardless, there's no public
192+
* methods to obtain a reference to the Http2ConnectionWriter or its internal lock. So we have to write
193+
* to the ClientConnection's DataWriter object directly. Stress-testing hasn't revealed any thread-races so far.
194+
* <p>
195+
* It's difficult to imagine a situation where the thread-race could occur. Perhaps a single PbjGrpcClient
196+
* (aka a single HTTP2 connection) and two streaming PbjGrpcCalls (aka HTTP2 streams) open concurrently,
197+
* one being very chatty and another one being very silent. The latter may start sending pings while the former
198+
* is sending requests to the server. However, this scenario seems very rare. If we ever encounter this issue,
199+
* then it's easy to work-around by creating separate PbjGrpcClients for the two calls on the client side.
200+
* To fix it, ideally we'd work with Helidon to expose the necessary APIs for synchronous writes. Alternatively,
201+
* we could introduce a PbjGrpcClient-level outgoing queue and send all requests and pings through it as
202+
* a work-around. However, this work-around may not fully cover the issue because Helidon can write window update
203+
* frames for the flowControl changes concurrently still as it reads data from the stream/socket.
204+
*/
205+
private void sendPing() {
206+
final Http2Ping ping = Http2Ping.create();
207+
final Http2FrameData frameData = ping.toFrameData();
208+
final Http2FrameHeader frameHeader = frameData.header();
209+
if (frameHeader.length() == 0) {
210+
throw new IllegalStateException("Ping with zero length. This should never happen.");
211+
} else {
212+
final BufferData headerData = frameHeader.write();
213+
final BufferData data = frameData.data().copy();
214+
try {
215+
grpcClient.getClientConnection().writer().writeNow(BufferData.create(headerData, data));
216+
} catch (IllegalStateException e) {
217+
// It may throw IllegalStateException: Attempt to call writer() on a closed connection
218+
// But callers usually expect an UncheckedIOException:
219+
throw new UncheckedIOException(new IOException("sendPing failed", e));
220+
}
221+
}
222+
}
223+
168224
private void receiveRepliesLoop() {
169225
try {
170226
Http2Headers http2Headers = null;
@@ -180,10 +236,7 @@ private void receiveRepliesLoop() {
180236
// if the server died.
181237
// FUTURE WORK: consider a separate KeepAlive timeout for these pings, so that we don't flood the
182238
// network.
183-
// NOTE: Google GRPC server drops the connection if pinged right after receiving the headers - it
184-
// complains about the frame size of 64K with 16K max allowed. It's unclear how/why this even
185-
// happens. However, pinging on timeout seems to work smoothly so far.
186-
clientStream.sendPing();
239+
sendPing();
187240
}
188241
} while (http2Headers == null && isStreamOpen());
189242

@@ -201,7 +254,7 @@ private void receiveRepliesLoop() {
201254
frameData = clientStream.readOne(grpcClient.getConfig().readTimeout());
202255
} catch (StreamTimeoutException e) {
203256
// Check if the connection is alive. See a comment above about the KeepAlive timeout.
204-
clientStream.sendPing();
257+
sendPing();
205258
// FUTURE WORK: implement an uber timeout to return
206259
continue;
207260
}

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ Http2Client getHttp2Client() {
107107
return http2Client;
108108
}
109109

110+
ClientConnection getClientConnection() {
111+
return clientConnection;
112+
}
113+
110114
Http2ClientConnection createHttp2ClientConnection(final ClientConnection clientConnection) {
111115
if (clientConnection == null) {
112116
// Must be a unit test, there's no any actual connections established, nothing to do.

pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.hedera.pbj.runtime.io.ReadableSequentialData;
2525
import com.hedera.pbj.runtime.io.buffer.Bytes;
2626
import io.helidon.common.buffers.BufferData;
27+
import io.helidon.common.buffers.DataWriter;
2728
import io.helidon.common.tls.Tls;
2829
import io.helidon.http.Header;
2930
import io.helidon.http.HeaderName;
@@ -74,6 +75,9 @@ private record Options(Optional<String> authority, String contentType) implement
7475
@Mock
7576
private ClientConnection clientConnection;
7677

78+
@Mock
79+
private DataWriter dataWriter;
80+
7781
@Mock
7882
private Http2ClientConnection connection;
7983

@@ -107,6 +111,9 @@ private record Options(Optional<String> authority, String contentType) implement
107111
private PbjGrpcCall createCall(final ServiceInterface.RequestOptions options) {
108112
doReturn(webClient).when(grpcClient).getWebClient();
109113
doReturn(executor).when(webClient).executor();
114+
// Only used in tests that verify timeout pings:
115+
lenient().doReturn(clientConnection).when(grpcClient).getClientConnection();
116+
lenient().doReturn(dataWriter).when(clientConnection).writer();
110117

111118
final PbjGrpcClientConfig config =
112119
new PbjGrpcClientConfig(READ_TIMEOUT, tls, OPTIONS.authority(), OPTIONS.contentType());
@@ -230,7 +237,8 @@ public void testReceiveRepliesLoopPingAndStreamClosed() {
230237

231238
runnable.run();
232239

233-
verify(grpcClientStream, times(1)).sendPing();
240+
// A ping:
241+
verify(dataWriter, times(1)).writeNow(any(BufferData.class));
234242
verify(pipeline, times(1)).onComplete();
235243
verifyNoMoreInteractions(pipeline);
236244
}
@@ -283,7 +291,8 @@ public void testReceiveRepliesLoopSingleReply(final boolean isTimeout) throws Ex
283291

284292
verify(pipeline, times(1)).onNext(reply);
285293
verify(pipeline, times(1)).onComplete();
286-
if (isTimeout) verify(grpcClientStream, times(1)).sendPing();
294+
// A ping:
295+
if (isTimeout) verify(dataWriter, times(1)).writeNow(any(BufferData.class));
287296
verifyNoMoreInteractions(pipeline);
288297
}
289298

pbj-integration-tests/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ jmhModuleInfo {
5757
requires("io.helidon.common")
5858
requires("io.helidon.webserver")
5959
requires("com.github.luben.zstd_jni")
60+
runtimeOnly("io.helidon.logging.jul")
6061
}
6162

6263
// version is added to module-info.class files

0 commit comments

Comments
 (0)