11package io .quarkus .opentelemetry .runtime .exporter .otlp .sender ;
22
3+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
4+
35import java .io .IOException ;
46import java .net .URI ;
57import java .nio .charset .StandardCharsets ;
@@ -59,6 +61,7 @@ public final class VertxGrpcSender implements GrpcSender {
5961 private final boolean compressionEnabled ;
6062 private final Map <String , String > headers ;
6163 private final String grpcEndpointPath ;
64+ private final Duration exportTimeout ;
6265
6366 private final GrpcClient client ;
6467
@@ -74,6 +77,7 @@ public VertxGrpcSender(
7477 this .server = SocketAddress .inetSocketAddress (OTelExporterUtil .getPort (grpcBaseUri ), grpcBaseUri .getHost ());
7578 this .compressionEnabled = compressionEnabled ;
7679 this .headers = headersMap ;
80+ this .exportTimeout = timeout ;
7781 var httpClientOptions = new HttpClientOptions ()
7882 .setHttp2ClearTextUpgrade (false ) // needed otherwise connections get closed immediately
7983 .setReadIdleTimeout ((int ) timeout .getSeconds ())
@@ -92,9 +96,9 @@ public void send(Marshaler request, Consumer onSuccess, Consumer onError) {
9296 var onSuccessHandler = new ClientRequestOnSuccessHandler (client , server , headers , compressionEnabled ,
9397 request ,
9498 loggedUnimplemented , logger , marshalerType , onSuccess , onError , 1 , grpcEndpointPath ,
95- isShutdown ::get );
99+ isShutdown ::get , exportTimeout );
96100
97- initiateSend (client , server , MAX_ATTEMPTS , onSuccessHandler , new Consumer <>() {
101+ initiateSend (client , server , MAX_ATTEMPTS , onSuccessHandler , exportTimeout , new Consumer <>() {
98102 @ Override
99103 public void accept (Throwable throwable ) {
100104 failOnClientRequest (marshalerType , throwable , onError );
@@ -128,12 +132,14 @@ public void handle(Throwable event) {
128132
129133 private static void initiateSend (GrpcClient client , SocketAddress server ,
130134 int numberOfAttempts ,
131- Handler <GrpcClientRequest <Buffer , Buffer >> onSuccessHandler ,
135+ Handler <GrpcClientRequest <Buffer , Buffer >> onSuccessHandler , Duration exportTimeout ,
132136 Consumer <Throwable > onFailureCallback ) {
133137 Uni .createFrom ().completionStage (new Supplier <CompletionStage <GrpcClientRequest <Buffer , Buffer >>>() {
134138 @ Override
135139 public CompletionStage <GrpcClientRequest <Buffer , Buffer >> get () {
136- return client .request (server ).toCompletionStage ();
140+ return client .request (server )
141+ .timeout (exportTimeout .toMillis (), MILLISECONDS )
142+ .toCompletionStage ();
137143 }
138144 })
139145 .onFailure (new Predicate <Throwable >() {
@@ -189,6 +195,7 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
189195
190196 private final int attemptNumber ;
191197 private final Supplier <Boolean > isShutdown ;
198+ private final Duration exportTimeout ;
192199
193200 public ClientRequestOnSuccessHandler (GrpcClient client ,
194201 SocketAddress server ,
@@ -202,7 +209,8 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
202209 Consumer <Throwable > onError ,
203210 int attemptNumber ,
204211 String grpcEndpointPath ,
205- Supplier <Boolean > isShutdown ) {
212+ Supplier <Boolean > isShutdown ,
213+ Duration exportTimeout ) {
206214 this .client = client ;
207215 this .server = server ;
208216 this .grpcEndpointPath = grpcEndpointPath ;
@@ -216,6 +224,7 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
216224 this .onError = onError ;
217225 this .attemptNumber = attemptNumber ;
218226 this .isShutdown = isShutdown ;
227+ this .exportTimeout = exportTimeout ;
219228 }
220229
221230 @ Override
@@ -250,7 +259,7 @@ public void handle(Throwable t) {
250259 // retry
251260 initiateSend (client , server ,
252261 MAX_ATTEMPTS - attemptNumber ,
253- newAttempt (),
262+ newAttempt (), exportTimeout ,
254263 new Consumer <>() {
255264 @ Override
256265 public void accept (Throwable throwable ) {
@@ -394,7 +403,7 @@ public void handle(Throwable t) {
394403 // retry
395404 initiateSend (client , server ,
396405 MAX_ATTEMPTS - attemptNumber ,
397- newAttempt (),
406+ newAttempt (), exportTimeout ,
398407 new Consumer <>() {
399408 @ Override
400409 public void accept (Throwable throwable ) {
@@ -429,7 +438,7 @@ private void failOnClientRequest(Throwable t, Consumer<Throwable> onError, int a
429438 public ClientRequestOnSuccessHandler newAttempt () {
430439 return new ClientRequestOnSuccessHandler (client , server , headers , compressionEnabled , marshaler ,
431440 loggedUnimplemented , logger , type , onSuccess , onError , attemptNumber + 1 ,
432- grpcEndpointPath , isShutdown );
441+ grpcEndpointPath , isShutdown , exportTimeout );
433442 }
434443 }
435444}
0 commit comments