Skip to content

Commit 178f1bb

Browse files
committed
chore: refactor gRPC Deadline checks and add logger warning for impractical values.
1 parent 924e01c commit 178f1bb

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -414,17 +414,24 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
414414
}
415415
});
416416

417-
// Copy call options to create a new deadline for this query
418-
// if queryTimeout is configured and deadline isn't explicitly set
419-
GrpcCallOptions grpcCallOptionsCopy = options.grpcCallOptions();
420-
if (grpcCallOptionsCopy.getDeadline() == null && config.getQueryTimeout() != null) {
421-
grpcCallOptionsCopy = new GrpcCallOptions.Builder()
422-
.fromGrpcCallOptions(grpcCallOptionsCopy)
423-
.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS))
424-
.build();
417+
GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder()
418+
.fromGrpcCallOptions(options.grpcCallOptions());
419+
420+
if (options.grpcCallOptions().getDeadline() == null) {
421+
if (config.getQueryTimeout() != null) {
422+
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
423+
}
424+
} else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.NANOSECONDS) <= 0) {
425+
LOG.warning("Received impractical gRPC call options deadline "
426+
+ options.grpcCallOptions().getDeadline());
427+
if (config.getQueryTimeout() != null) {
428+
LOG.warning("Using configuration query timeout "
429+
+ config.getQueryTimeout() + " as a replacement.");
430+
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
431+
}
425432
}
426433

427-
CallOption[] callOptions = grpcCallOptionsCopy.getCallOptions();
434+
CallOption[] callOptions = builder.build().getCallOptions();
428435

429436
return flightSqlClient.execute(
430437
query,

src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,50 @@ public void queryOptionsUnchangedByCall() throws IOException {
329329
} catch (Exception e) {
330330
throw new RuntimeException(e);
331331
}
332+
}
333+
334+
@Test
335+
public void impracticalGRPCDeadlineReplacedByQueryTimeout() throws IOException {
336+
int freePort = findFreePort();
337+
URI uri = URI.create("http://127.0.0.1:" + freePort);
338+
int rowCount = 10;
339+
try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(10, rowCount);
340+
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
341+
FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator,
342+
TestUtils.simpleProducer(vectorSchemaRoot))
343+
) {
344+
flightServer.start();
345+
346+
String host = String.format("http://%s:%d", uri.getHost(), uri.getPort());
347+
ClientConfig clientConfig = new ClientConfig.Builder()
348+
.host(host)
349+
.database("test")
350+
.writeTimeout(Duration.ofSeconds(60))
351+
.queryTimeout(Duration.ofSeconds(60))
352+
.build();
353+
354+
GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder()
355+
.withMaxInboundMessageSize(1024 * 1024 * 1024)
356+
.withDeadline(Deadline.after(-2, TimeUnit.MILLISECONDS))
357+
.build();
332358

359+
QueryOptions queryOptions = new QueryOptions("test");
360+
queryOptions.setGrpcCallOptions(grpcCallOption);
361+
QueryOptions originalQueryOptions = queryOptions.clone();
362+
Assertions.assertThat(originalQueryOptions).isEqualTo(queryOptions);
363+
364+
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
365+
Assertions.assertThatNoException().isThrownBy(() -> {
366+
Stream<PointValues> stream = influxDBClient.queryPoints(
367+
"Select * from \"sensors\"",
368+
queryOptions);
369+
Assertions.assertThat(stream.count()).isEqualTo(rowCount);
370+
stream.close();
371+
});
372+
}
373+
Assertions.assertThat(queryOptions).isEqualTo(originalQueryOptions);
374+
} catch (Exception e) {
375+
throw new RuntimeException(e);
376+
}
333377
}
334378
}

0 commit comments

Comments
 (0)