Skip to content

Commit e4fed9b

Browse files
committed
feat: (WIP) adding new writeTimeout and queryTimeout config properties.
1 parent b85b1aa commit e4fed9b

File tree

8 files changed

+430
-6
lines changed

8 files changed

+430
-6
lines changed

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import com.influxdb.v3.client.write.WriteOptions;
4040
import com.influxdb.v3.client.write.WritePrecision;
4141

42+
// TODO ensure that if writeTimeout is not defined but timeout is that writeTimeout is initialized to timeout
43+
4244
/**
4345
* The <code>ClientConfig</code> holds the configurations for the
4446
* {@link com.influxdb.v3.client.InfluxDBClient} client.
@@ -54,7 +56,7 @@
5456
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
5557
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
5658
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
57-
* <li><code>timeout</code> - <i>deprecated in 1.3.0</i> timeout when connecting to InfluxDB,
59+
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
5860
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
5961
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
6062
* <li><code>queryTimeout</code> - timeout used to calculate a default GRPC deadline when querying InfluxDB.
@@ -103,6 +105,7 @@ public final class ClientConfig {
103105
private final Integer gzipThreshold;
104106
private final Boolean writeNoSync;
105107
private final Map<String, String> defaultTags;
108+
@Deprecated
106109
private final Duration timeout;
107110
private final Duration writeTimeout;
108111
private final Duration queryTimeout;
@@ -219,10 +222,10 @@ public Duration getTimeout() {
219222
}
220223

221224
/**
222-
* Gets the default timeout in seconds to use for REST Write API calls. Default is
225+
* Gets the default timeout to use for REST Write API calls. Default is
223226
* {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT}
224227
*
225-
* @return the default timeout in seconds to use for REST Write API calls.
228+
* @return the default timeout to use for REST Write API calls.
226229
*/
227230
@Nonnull
228231
public Duration getWriteTimeout() {
@@ -840,7 +843,8 @@ private ClientConfig(@Nonnull final Builder builder) {
840843
defaultTags = builder.defaultTags;
841844
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
842845
writeTimeout = builder.writeTimeout != null
843-
? builder.writeTimeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
846+
? builder.writeTimeout : builder.timeout != null
847+
? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
844848
queryTimeout = builder.queryTimeout;
845849
allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false;
846850
disableServerCertificateValidation = builder.disableServerCertificateValidation != null

src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
package com.influxdb.v3.client.internal;
2323

2424
import java.util.ArrayList;
25+
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.Objects;
2728
import java.util.concurrent.Executor;
2829
import javax.annotation.Nonnull;
2930
import javax.annotation.Nullable;
31+
import javax.annotation.Tainted;
3032

3133
import io.grpc.CompressorRegistry;
3234
import io.grpc.Deadline;
@@ -288,8 +290,39 @@ public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessag
288290
return this;
289291
}
290292

293+
/**
294+
* Helper method to clone already existing options
295+
*
296+
* @param grpcCallOptions = options to copy
297+
* @return this
298+
*/
299+
public Builder fromGrpcCallOptions(@Nonnull GrpcCallOptions grpcCallOptions) {
300+
if (grpcCallOptions.getDeadline() != null) {
301+
this.deadline = grpcCallOptions.getDeadline();
302+
}
303+
if (grpcCallOptions.getExecutor() != null) {
304+
this.executor = grpcCallOptions.getExecutor();
305+
}
306+
if (grpcCallOptions.getCompressorName() != null) {
307+
this.compressorName = grpcCallOptions.getCompressorName();
308+
}
309+
if (grpcCallOptions.getWaitForReady() != null) {
310+
this.waitForReady = grpcCallOptions.getWaitForReady();
311+
}
312+
if (grpcCallOptions.getMaxInboundMessageSize() != null) {
313+
this.maxInboundMessageSize = grpcCallOptions.getMaxInboundMessageSize();
314+
}
315+
if (grpcCallOptions.getMaxOutboundMessageSize() != null) {
316+
this.maxOutboundMessageSize = grpcCallOptions.getMaxOutboundMessageSize();
317+
}
318+
return this;
319+
}
320+
291321
/**
292322
* Sets the maximum allowed message size acceptable sent to the remote peer.
323+
* <p>
324+
* Note: this property leads to grpc-java issue 12109 and can lead to the connection hanging indefinitely.
325+
* See (<a href="https://github.com/grpc/grpc-java/issues/12109">grpc-java 12109</a>)
293326
*
294327
* @param maxOutboundMessageSize The maximum message send size
295328
* @return this

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import java.io.ByteArrayOutputStream;
2525
import java.io.IOException;
2626
import java.nio.charset.StandardCharsets;
27+
import java.util.Arrays;
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Objects;
33+
import java.util.concurrent.TimeUnit;
3234
import java.util.logging.Logger;
3335
import java.util.stream.Collectors;
3436
import java.util.stream.IntStream;
@@ -37,6 +39,7 @@
3739
import javax.annotation.Nonnull;
3840
import javax.annotation.Nullable;
3941

42+
import io.grpc.Deadline;
4043
import io.netty.handler.codec.http.HttpMethod;
4144
import io.netty.handler.codec.http.HttpResponseStatus;
4245
import org.apache.arrow.flight.CallOption;
@@ -396,6 +399,13 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
396399
Arguments.checkNotNull(parameters, "parameters");
397400
Arguments.checkNotNull(options, "options");
398401

402+
if( options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) {
403+
options.setGrpcCallOptions(new GrpcCallOptions.Builder()
404+
.fromGrpcCallOptions(options.grpcCallOptions())
405+
.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS))
406+
.build());
407+
}
408+
399409
if (closed) {
400410
throw new IllegalStateException("InfluxDBClient has been closed.");
401411
}
@@ -413,6 +423,7 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
413423
});
414424

415425
CallOption[] callOptions = options.grpcCallOptions().getCallOptions();
426+
416427
return flightSqlClient.execute(
417428
query,
418429
database,

src/main/java/com/influxdb/v3/client/internal/RestClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void checkServerTrusted(
101101

102102
// timeout and redirects
103103
HttpClient.Builder builder = HttpClient.newBuilder()
104-
.connectTimeout(config.getTimeout())
104+
.connectTimeout(config.getWriteTimeout())
105105
.followRedirects(config.getAllowHttpRedirects()
106106
? HttpClient.Redirect.NORMAL : HttpClient.Redirect.NEVER);
107107

src/test/java/com/influxdb/v3/client/ITQueryWrite.java

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,27 @@
2323

2424
import java.io.IOException;
2525
import java.math.BigInteger;
26+
import java.time.Duration;
2627
import java.time.Instant;
2728
import java.time.temporal.ChronoUnit;
2829
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Random;
33+
import java.util.concurrent.TimeUnit;
3234
import java.util.stream.Collectors;
3335
import java.util.stream.Stream;
3436

37+
import com.influxdb.v3.client.internal.GrpcCallOptions;
38+
import io.grpc.Deadline;
3539
import org.apache.arrow.flight.CallStatus;
3640
import org.apache.arrow.flight.FlightRuntimeException;
3741
import org.apache.arrow.flight.FlightStatusCode;
3842
import org.apache.arrow.vector.VectorSchemaRoot;
3943
import org.assertj.core.api.Assertions;
4044
import org.jetbrains.annotations.NotNull;
4145
import org.junit.jupiter.api.AfterEach;
46+
import org.junit.jupiter.api.Disabled;
4247
import org.junit.jupiter.api.Test;
4348
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
4449

@@ -47,6 +52,8 @@
4752
import com.influxdb.v3.client.write.WriteOptions;
4853
import com.influxdb.v3.client.write.WritePrecision;
4954

55+
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
56+
5057
class ITQueryWrite {
5158

5259
private InfluxDBClient client;
@@ -301,6 +308,202 @@ public void handleFlightRuntimeException() throws IOException {
301308

302309
}
303310

311+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
312+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
313+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
314+
@Test
315+
public void queryTimeoutExceededTest() {
316+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
317+
.host(System.getenv("TESTING_INFLUXDB_URL"))
318+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
319+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
320+
.queryTimeout(Duration.ofMillis(100))
321+
.build());
322+
323+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
324+
long testId = System.currentTimeMillis();
325+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
326+
327+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
328+
329+
Throwable thrown = catchThrowable(() -> {
330+
Stream<Object[]> stream = client.query(sql);
331+
stream.forEach(row -> {
332+
Assertions.assertThat(row).hasSize(1);
333+
Assertions.assertThat(row[0]).isEqualTo(123.0);
334+
});
335+
});
336+
337+
Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
338+
Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*");
339+
}
340+
341+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
342+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
343+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
344+
@Test
345+
public void queryTimeoutOKTest() {
346+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
347+
.host(System.getenv("TESTING_INFLUXDB_URL"))
348+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
349+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
350+
.queryTimeout(Duration.ofSeconds(3))
351+
.build());
352+
353+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
354+
long testId = System.currentTimeMillis();
355+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
356+
357+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
358+
359+
try(Stream<Object[]> stream = client.query(sql)) {
360+
stream.forEach(row -> {
361+
Assertions.assertThat(row).hasSize(1);
362+
Assertions.assertThat(row[0]).isEqualTo(123.0);
363+
});
364+
} catch (Exception e) {
365+
throw new RuntimeException(e);
366+
}
367+
}
368+
369+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
370+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
371+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
372+
@Test
373+
public void queryTimeoutOtherGrpcOptUnaffectedTest() {
374+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
375+
.host(System.getenv("TESTING_INFLUXDB_URL"))
376+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
377+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
378+
.queryTimeout(Duration.ofSeconds(3))
379+
.build());
380+
381+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
382+
long testId = System.currentTimeMillis();
383+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
384+
385+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
386+
387+
QueryOptions queryOptions = QueryOptions.defaultQueryOptions();
388+
queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder()
389+
.withMaxInboundMessageSize(10)
390+
.build()
391+
);
392+
393+
Throwable thrown = catchThrowable(() -> {
394+
Stream<Object[]> stream = client.query(sql, queryOptions);
395+
stream.forEach(row -> {
396+
Assertions.assertThat(row).hasSize(1);
397+
Assertions.assertThat(row[0]).isEqualTo(123.0);
398+
});
399+
});
400+
401+
Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
402+
Assertions.assertThat(thrown.getMessage()).contains("gRPC message exceeds maximum size");
403+
}
404+
405+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
406+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
407+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
408+
@Test
409+
public void queryTimeoutSuperceededByGrpcOptTest() {
410+
411+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
412+
.host(System.getenv("TESTING_INFLUXDB_URL"))
413+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
414+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
415+
.queryTimeout(Duration.ofSeconds(3))
416+
.build());
417+
418+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
419+
long testId = System.currentTimeMillis();
420+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
421+
422+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
423+
424+
QueryOptions queryOptions = QueryOptions.defaultQueryOptions();
425+
queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder()
426+
.withDeadline(Deadline.after(100, TimeUnit.MILLISECONDS))
427+
.build()
428+
);
429+
430+
Throwable thrown = catchThrowable(() -> {
431+
Stream<Object[]> stream = client.query(sql, queryOptions);
432+
stream.forEach(row -> {
433+
Assertions.assertThat(row).hasSize(1);
434+
Assertions.assertThat(row[0]).isEqualTo(123.0);
435+
});
436+
});
437+
438+
Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class);
439+
Assertions.assertThat(thrown.getMessage()).matches(".*deadline.*exceeded.*");
440+
}
441+
442+
@Test
443+
public void repeatQueryWithTimeoutTest(){
444+
long timeout = 1000;
445+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
446+
.host(System.getenv("TESTING_INFLUXDB_URL"))
447+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
448+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
449+
.queryTimeout(Duration.ofMillis(timeout))
450+
.build());
451+
452+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
453+
long testId = System.currentTimeMillis();
454+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
455+
456+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
457+
458+
for (int i = 0; i < 3; i++){
459+
try(Stream<Object[]> stream = client.query(sql)) {
460+
stream.forEach(row -> {
461+
Assertions.assertThat(row).hasSize(1);
462+
Assertions.assertThat(row[0]).isEqualTo(123.0);
463+
});
464+
TimeUnit.MILLISECONDS.sleep(timeout + 100);
465+
} catch (Exception e) {
466+
throw new RuntimeException(e);
467+
}
468+
}
469+
}
470+
471+
@Test
472+
@Disabled("Runs across issue 12109 in Grpc-Java library ")
473+
public void queryGrpcMaxOutSizeTest() {
474+
// See Grpc-java issue 12109 https://github.com/grpc/grpc-java/issues/12109
475+
// TODO - re-enable after 12109 has a fix and dependencies are updated
476+
client = InfluxDBClient.getInstance(new ClientConfig.Builder()
477+
.host(System.getenv("TESTING_INFLUXDB_URL"))
478+
.token(System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray())
479+
.database(System.getenv("TESTING_INFLUXDB_DATABASE"))
480+
// .queryTimeout(Duration.ofSeconds(3))
481+
.build());
482+
483+
String measurement = "timeout_test_" + Math.round(Math.random() * 100_000);
484+
long testId = System.currentTimeMillis();
485+
client.writeRecord(measurement + ",type=used value=123.0,testId=" + testId);
486+
487+
String sql = String.format("SELECT value FROM %s WHERE \"testId\"=%d", measurement, testId);
488+
489+
QueryOptions queryOptions = QueryOptions.defaultQueryOptions();
490+
queryOptions.setGrpcCallOptions(new GrpcCallOptions.Builder()
491+
.withMaxOutboundMessageSize(10)
492+
.build()
493+
);
494+
495+
try(Stream<Object[]> stream = client.query(sql, queryOptions)) {
496+
stream.forEach(row -> {
497+
Assertions.assertThat(row).hasSize(1);
498+
Assertions.assertThat(row[0]).isEqualTo(123.0);
499+
});
500+
501+
} catch (Exception e) {
502+
throw new RuntimeException(e);
503+
}
504+
505+
}
506+
304507
@NotNull
305508
private static InfluxDBClient getInstance() {
306509
return InfluxDBClient.getInstance(

0 commit comments

Comments
 (0)