Skip to content

Commit 5e559a7

Browse files
authored
fix: call option deadline timeout unchanged in query method call (#310)
* fix: mutating QueryOptions arg in query method can lead to 0 and stale deadlines. * chore: cleanup lint * tests: add test of QueryOptions.clone and comparison. * docs: update CHANGELOG.md * tests: additional QueryOptions test * tests: add assert * chore: remove debug messages from test * chore: refactor gRPC Deadline checks and add logger warning for impractical values. * chore: clarify logic of choosing Deadline value, update tests, remove QueryOptions.clone() * chore: remove commented assertions. * docs: update Javadoc with preferred Deadline approach.
1 parent 4d74671 commit 5e559a7

File tree

7 files changed

+300
-57
lines changed

7 files changed

+300
-57
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`.
66

7+
### Bug Fixes
8+
9+
1. [#310](https://github.com/InfluxCommunity/influxdb3-java/pull/310): Ensure `QueryOptions` objects are left unchanged within the `queryData` implementation.
10+
711
## 1.5.0 [2025-10-22]
812

913
### Features

src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.influxdb.v3.client.internal;
2323

24+
import java.time.Duration;
2425
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.Objects;
@@ -226,6 +227,11 @@ public Builder() {
226227
/**
227228
* Sets the absolute deadline for a rpc call.
228229
*
230+
* <p><i>Please note</i> the preferred approach is to set a <code>queryTimeout</code>
231+
* Duration value globally in the ClientConfig
232+
* ({@link com.influxdb.v3.client.config.ClientConfig.Builder#queryTimeout(Duration)}).
233+
* This value will then be used to calculate a new Deadline with each call.</p>
234+
*
229235
* @param deadline The deadline
230236
* @return this
231237
*/
@@ -234,6 +240,17 @@ public Builder withDeadline(final @Nonnull Deadline deadline) {
234240
return this;
235241
}
236242

243+
/**
244+
* Unsets absolute deadline. Note deadline may have been set
245+
* via {@link #fromGrpcCallOptions(GrpcCallOptions)} method.
246+
*
247+
* @return this
248+
*/
249+
public Builder withoutDeadline() {
250+
this.deadline = null;
251+
return this;
252+
}
253+
237254
/**
238255
* Sets an {@code executor} to be used instead of the default
239256
* executor specified with {@link ManagedChannelBuilder#executor}.

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -398,13 +398,6 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
398398
Arguments.checkNotNull(parameters, "parameters");
399399
Arguments.checkNotNull(options, "options");
400400

401-
if (options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) {
402-
options.setGrpcCallOptions(new GrpcCallOptions.Builder()
403-
.fromGrpcCallOptions(options.grpcCallOptions())
404-
.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS))
405-
.build());
406-
}
407-
408401
if (closed) {
409402
throw new IllegalStateException("InfluxDBClient has been closed.");
410403
}
@@ -421,7 +414,31 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
421414
}
422415
});
423416

424-
CallOption[] callOptions = options.grpcCallOptions().getCallOptions();
417+
GrpcCallOptions.Builder builder = new GrpcCallOptions.Builder()
418+
.fromGrpcCallOptions(options.grpcCallOptions());
419+
420+
if (config.getQueryTimeout() == null) {
421+
if (options.grpcCallOptions().getDeadline() != null
422+
&& options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) {
423+
LOG.warning("Query timeout "
424+
+ options.grpcCallOptions().getDeadline()
425+
+ " is 0 or negative and will be ignored.");
426+
builder.withoutDeadline();
427+
}
428+
} else {
429+
if (options.grpcCallOptions().getDeadline() == null) {
430+
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
431+
} else if (options.grpcCallOptions().getDeadline().timeRemaining(TimeUnit.MILLISECONDS) <= 0) {
432+
LOG.warning("Query timeout "
433+
+ options.grpcCallOptions().getDeadline()
434+
+ " is 0 or negative. Using config.queryTimeout "
435+
+ config.getQueryTimeout()
436+
+ " instead.");
437+
builder.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS));
438+
}
439+
}
440+
441+
CallOption[] callOptions = builder.build().getCallOptions();
425442

426443
return flightSqlClient.execute(
427444
query,

src/main/java/com/influxdb/v3/client/query/QueryOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package com.influxdb.v3.client.query;
2323

2424
import java.util.Map;
25+
import java.util.Objects;
2526
import javax.annotation.Nonnull;
2627
import javax.annotation.Nullable;
2728
import javax.annotation.concurrent.ThreadSafe;
@@ -190,4 +191,24 @@ public GrpcCallOptions grpcCallOptions() {
190191
private boolean isNotDefined(final String option) {
191192
return option == null || option.isEmpty();
192193
}
194+
195+
@Override
196+
public boolean equals(final Object o) {
197+
if (this == o) {
198+
return true;
199+
}
200+
if (o == null || getClass() != o.getClass()) {
201+
return false;
202+
}
203+
QueryOptions that = (QueryOptions) o;
204+
return Objects.equals(this.database, that.database)
205+
&& Objects.equals(this.queryType, that.queryType)
206+
&& Objects.equals(this.headers, that.headers)
207+
&& Objects.equals(this.grpcCallOptions, that.grpcCallOptions);
208+
}
209+
210+
@Override
211+
public int hashCode() {
212+
return Objects.hash(database, queryType, headers, grpcCallOptions);
213+
}
193214
}

src/test/java/com/influxdb/v3/client/ITQueryWrite.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ public void queryTimeoutSuperceededByGrpcOptTest() {
412412
.host(System.getenv("TESTING_INFLUXDB_URL"))
413413
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
414414
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
415-
.queryTimeout(Duration.ofSeconds(3))
415+
.queryTimeout(Duration.ofNanos(3))
416416
.build());
417417

418418
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
@@ -423,20 +423,18 @@ public void queryTimeoutSuperceededByGrpcOptTest() {
423423

424424
QueryOptions queryOptions = QueryOptions.defaultQueryOptions();
425425
queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder()
426-
.withDeadline(Deadline.after(5000, TimeUnit.NANOSECONDS))
426+
.withDeadline(Deadline.after(500, TimeUnit.MILLISECONDS))
427427
.build()
428428
);
429429

430-
Throwable thrown = catchThrowable(() -> {
431-
Stream<Object[]> stream = client.query(sql, queryOptions);
432-
stream.forEach(row -> {
433-
Assertions.assertThat(row).hasSize(1);
434-
Assertions.assertThat(row[0]).isEqualTo(123.0);
435-
});
430+
Assertions.assertThatNoException().isThrownBy(() -> {
431+
try (Stream<Object[]> stream = client.query(sql, queryOptions)) {
432+
stream.forEach(row -> {
433+
Assertions.assertThat(row).hasSize(1);
434+
Assertions.assertThat(row[0]).isEqualTo(123.0);
435+
});
436+
}
436437
});
437-
438-
Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
439-
Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*");
440438
}
441439

442440
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")

src/test/java/com/influxdb/v3/client/internal/GrpcCallOptionsTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,25 @@ void testToString() {
252252
+ "maxOutboundMessageSize=5000}";
253253
assertEquals(expected, options.toString());
254254
}
255+
256+
@Test
257+
void testDeadlineIsNotZeroOrStale() throws InterruptedException {
258+
// Create a deadline 3 seconds from now
259+
Deadline deadline = Deadline.after(3000, TimeUnit.MILLISECONDS);
260+
GrpcCallOptions options = new GrpcCallOptions.Builder()
261+
.withDeadline(deadline)
262+
.build();
263+
264+
// Verify the deadline is in the future
265+
assertNotNull(options.getDeadline());
266+
long timeRemaining = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
267+
Assertions.assertTrue(timeRemaining > 0, "Deadline should be in the future");
268+
Assertions.assertTrue(timeRemaining <= 3000, "Deadline should not exceed 3 seconds");
269+
270+
// Wait a bit and verify deadline is still valid but closer
271+
TimeUnit.MILLISECONDS.sleep(100);
272+
long timeRemainingAfter = options.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
273+
Assertions.assertTrue(timeRemainingAfter > 0, "Deadline should still be in the future");
274+
Assertions.assertTrue(timeRemainingAfter < timeRemaining, "Deadline should be closer to expiration");
275+
}
255276
}

0 commit comments

Comments
 (0)