Skip to content

Commit 6bdd142

Browse files
committed
Set request timeout
1 parent f206fa8 commit 6bdd142

File tree

1 file changed

+17
-8
lines changed
  • extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/sender

1 file changed

+17
-8
lines changed

extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/sender/VertxGrpcSender.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.quarkus.opentelemetry.runtime.exporter.otlp.sender;
22

3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4+
35
import java.io.IOException;
46
import java.net.URI;
57
import java.nio.charset.StandardCharsets;
@@ -59,6 +61,7 @@ public final class VertxGrpcSender implements GrpcSender {
5961
private final boolean compressionEnabled;
6062
private final Map<String, String> headers;
6163
private final String grpcEndpointPath;
64+
private final Duration exportTimeout;
6265

6366
private final GrpcClient client;
6467

@@ -74,6 +77,7 @@ public VertxGrpcSender(
7477
this.server = SocketAddress.inetSocketAddress(OTelExporterUtil.getPort(grpcBaseUri), grpcBaseUri.getHost());
7578
this.compressionEnabled = compressionEnabled;
7679
this.headers = headersMap;
80+
this.exportTimeout = timeout;
7781
var httpClientOptions = new HttpClientOptions()
7882
.setHttp2ClearTextUpgrade(false) // needed otherwise connections get closed immediately
7983
.setReadIdleTimeout((int) timeout.getSeconds())
@@ -92,9 +96,9 @@ public void send(Marshaler request, Consumer onSuccess, Consumer onError) {
9296
var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled,
9397
request,
9498
loggedUnimplemented, logger, marshalerType, onSuccess, onError, 1, grpcEndpointPath,
95-
isShutdown::get);
99+
isShutdown::get, exportTimeout);
96100

97-
initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, new Consumer<>() {
101+
initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, exportTimeout, new Consumer<>() {
98102
@Override
99103
public void accept(Throwable throwable) {
100104
failOnClientRequest(marshalerType, throwable, onError);
@@ -128,12 +132,14 @@ public void handle(Throwable event) {
128132

129133
private static void initiateSend(GrpcClient client, SocketAddress server,
130134
int numberOfAttempts,
131-
Handler<GrpcClientRequest<Buffer, Buffer>> onSuccessHandler,
135+
Handler<GrpcClientRequest<Buffer, Buffer>> onSuccessHandler, Duration exportTimeout,
132136
Consumer<Throwable> onFailureCallback) {
133137
Uni.createFrom().completionStage(new Supplier<CompletionStage<GrpcClientRequest<Buffer, Buffer>>>() {
134138
@Override
135139
public CompletionStage<GrpcClientRequest<Buffer, Buffer>> get() {
136-
return client.request(server).toCompletionStage();
140+
return client.request(server)
141+
.timeout(exportTimeout.toMillis(), MILLISECONDS)
142+
.toCompletionStage();
137143
}
138144
})
139145
.onFailure(new Predicate<Throwable>() {
@@ -189,6 +195,7 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
189195

190196
private final int attemptNumber;
191197
private final Supplier<Boolean> isShutdown;
198+
private final Duration exportTimeout;
192199

193200
public ClientRequestOnSuccessHandler(GrpcClient client,
194201
SocketAddress server,
@@ -202,7 +209,8 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
202209
Consumer<Throwable> onError,
203210
int attemptNumber,
204211
String grpcEndpointPath,
205-
Supplier<Boolean> isShutdown) {
212+
Supplier<Boolean> isShutdown,
213+
Duration exportTimeout) {
206214
this.client = client;
207215
this.server = server;
208216
this.grpcEndpointPath = grpcEndpointPath;
@@ -216,6 +224,7 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
216224
this.onError = onError;
217225
this.attemptNumber = attemptNumber;
218226
this.isShutdown = isShutdown;
227+
this.exportTimeout = exportTimeout;
219228
}
220229

221230
@Override
@@ -250,7 +259,7 @@ public void handle(Throwable t) {
250259
// retry
251260
initiateSend(client, server,
252261
MAX_ATTEMPTS - attemptNumber,
253-
newAttempt(),
262+
newAttempt(), exportTimeout,
254263
new Consumer<>() {
255264
@Override
256265
public void accept(Throwable throwable) {
@@ -394,7 +403,7 @@ public void handle(Throwable t) {
394403
// retry
395404
initiateSend(client, server,
396405
MAX_ATTEMPTS - attemptNumber,
397-
newAttempt(),
406+
newAttempt(), exportTimeout,
398407
new Consumer<>() {
399408
@Override
400409
public void accept(Throwable throwable) {
@@ -429,7 +438,7 @@ private void failOnClientRequest(Throwable t, Consumer<Throwable> onError, int a
429438
public ClientRequestOnSuccessHandler newAttempt() {
430439
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, marshaler,
431440
loggedUnimplemented, logger, type, onSuccess, onError, attemptNumber + 1,
432-
grpcEndpointPath, isShutdown);
441+
grpcEndpointPath, isShutdown, exportTimeout);
433442
}
434443
}
435444
}

0 commit comments

Comments
 (0)