Skip to content

Commit 908facb

Browse files
committed
feat: fast no-sync write support
1 parent fcfdb06 commit 908facb

File tree

10 files changed

+401
-25
lines changed

10 files changed

+401
-25
lines changed

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
445445
* <li>database - database (bucket) name</li>
446446
* <li>precision - timestamp precision when writing data</li>
447447
* <li>gzipThreshold - payload size size for gzipping data</li>
448+
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
448449
* </ul>
449450
*
450451
* @param connectionString connection string
@@ -477,6 +478,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
477478
* <li>INFLUX_DATABASE - database (bucket) name</li>
478479
* <li>INFLUX_PRECISION - timestamp precision when writing data</li>
479480
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
481+
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
480482
* </ul>
481483
* Supported system properties:
482484
* <ul>
@@ -487,6 +489,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
487489
* <li>influx.database - database (bucket) name</li>
488490
* <li>influx.precision - timestamp precision when writing data</li>
489491
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
492+
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
490493
* </ul>
491494
*
492495
* @return instance of {@link InfluxDBClient}

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import javax.annotation.Nonnull;
3737
import javax.annotation.Nullable;
3838

39+
import com.influxdb.v3.client.write.WriteOptions;
3940
import com.influxdb.v3.client.write.WritePrecision;
4041

4142
/**
@@ -52,6 +53,7 @@
5253
* <li><code>writePrecision</code> - precision to use when writing points to InfluxDB</li>
5354
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
5455
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
56+
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
5557
* <li><code>responseTimeout</code> - timeout when connecting to InfluxDB</li>
5658
* <li><code>allowHttpRedirects</code> - allow redirects for InfluxDB connections</li>
5759
* <li><code>disableServerCertificateValidation</code> -
@@ -73,6 +75,7 @@
7375
* .database("my-database")
7476
* .writePrecision(WritePrecision.S)
7577
* .gzipThreshold(4096)
78+
* .writeNoSync(true)
7679
* .proxyUrl("http://localhost:10000")
7780
* .build();
7881
*
@@ -94,6 +97,7 @@ public final class ClientConfig {
9497
private final String database;
9598
private final WritePrecision writePrecision;
9699
private final Integer gzipThreshold;
100+
private final Boolean writeNoSync;
97101
private final Map<String, String> defaultTags;
98102
private final Duration timeout;
99103
private final Boolean allowHttpRedirects;
@@ -180,6 +184,16 @@ public Integer getGzipThreshold() {
180184
return gzipThreshold;
181185
}
182186

187+
/**
188+
* Skip waiting for WAL persistence on write?
189+
*
190+
* @return skip waiting for WAL persistence on write
191+
*/
192+
@Nonnull
193+
public Boolean getWriteNoSync() {
194+
return writeNoSync;
195+
}
196+
183197
/**
184198
* Gets default tags used when writing points.
185199
* @return default tags
@@ -295,6 +309,7 @@ public boolean equals(final Object o) {
295309
&& Objects.equals(database, that.database)
296310
&& writePrecision == that.writePrecision
297311
&& Objects.equals(gzipThreshold, that.gzipThreshold)
312+
&& Objects.equals(writeNoSync, that.writeNoSync)
298313
&& Objects.equals(defaultTags, that.defaultTags)
299314
&& Objects.equals(timeout, that.timeout)
300315
&& Objects.equals(allowHttpRedirects, that.allowHttpRedirects)
@@ -309,7 +324,7 @@ public boolean equals(final Object o) {
309324
@Override
310325
public int hashCode() {
311326
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
312-
database, writePrecision, gzipThreshold,
327+
database, writePrecision, gzipThreshold, writeNoSync,
313328
timeout, allowHttpRedirects, disableServerCertificateValidation,
314329
proxy, proxyUrl, authenticator, headers,
315330
defaultTags, sslRootsFilePath);
@@ -323,6 +338,7 @@ public String toString() {
323338
.add("database='" + database + "'")
324339
.add("writePrecision=" + writePrecision)
325340
.add("gzipThreshold=" + gzipThreshold)
341+
.add("writeNoSync=" + writeNoSync)
326342
.add("timeout=" + timeout)
327343
.add("allowHttpRedirects=" + allowHttpRedirects)
328344
.add("disableServerCertificateValidation=" + disableServerCertificateValidation)
@@ -348,6 +364,7 @@ public static final class Builder {
348364
private String database;
349365
private WritePrecision writePrecision;
350366
private Integer gzipThreshold;
367+
private Boolean writeNoSync;
351368
private Map<String, String> defaultTags;
352369
private Duration timeout;
353370
private Boolean allowHttpRedirects;
@@ -452,6 +469,19 @@ public Builder gzipThreshold(@Nullable final Integer gzipThreshold) {
452469
return this;
453470
}
454471

472+
/**
473+
* Sets whether to skip waiting for WAL persistence on write.
474+
*
475+
* @param writeNoSync skip waiting for WAL persistence on write
476+
* @return this
477+
*/
478+
@Nonnull
479+
public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
480+
481+
this.writeNoSync = writeNoSync;
482+
return this;
483+
}
484+
455485
/**
456486
* Sets default tags to be written with points.
457487
*
@@ -634,6 +664,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
634664
if (parameters.containsKey("gzipThreshold")) {
635665
this.gzipThreshold(Integer.parseInt(parameters.get("gzipThreshold")));
636666
}
667+
if (parameters.containsKey("writeNoSync")) {
668+
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
669+
}
637670

638671
return new ClientConfig(this);
639672
}
@@ -682,6 +715,10 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
682715
if (gzipThreshold != null) {
683716
this.gzipThreshold(Integer.parseInt(gzipThreshold));
684717
}
718+
final String writeNoSync = get.apply("INFLUX_WRITE_NO_SYNC", "influx.writeNoSync");
719+
if (writeNoSync != null) {
720+
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
721+
}
685722

686723
return new ClientConfig(this);
687724
}
@@ -690,15 +727,19 @@ private WritePrecision parsePrecision(@Nonnull final String precision) {
690727
WritePrecision writePrecision;
691728
switch (precision) {
692729
case "ns":
730+
case "nanosecond":
693731
writePrecision = WritePrecision.NS;
694732
break;
695733
case "us":
734+
case "microsecond":
696735
writePrecision = WritePrecision.US;
697736
break;
698737
case "ms":
738+
case "millisecond":
699739
writePrecision = WritePrecision.MS;
700740
break;
701741
case "s":
742+
case "second":
702743
writePrecision = WritePrecision.S;
703744
break;
704745
default:
@@ -716,8 +757,9 @@ private ClientConfig(@Nonnull final Builder builder) {
716757
authScheme = builder.authScheme;
717758
organization = builder.organization;
718759
database = builder.database;
719-
writePrecision = builder.writePrecision != null ? builder.writePrecision : WritePrecision.NS;
720-
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : 1000;
760+
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
761+
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
762+
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
721763
defaultTags = builder.defaultTags;
722764
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(10);
723765
allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false;

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@
3737
import javax.annotation.Nonnull;
3838
import javax.annotation.Nullable;
3939

40+
import com.influxdb.v3.client.*;
41+
import com.influxdb.v3.client.write.WritePrecisionConverter;
4042
import io.netty.handler.codec.http.HttpMethod;
43+
import io.netty.handler.codec.http.HttpResponseStatus;
4144
import org.apache.arrow.flight.CallOption;
4245
import org.apache.arrow.vector.FieldVector;
4346
import org.apache.arrow.vector.VectorSchemaRoot;
4447

45-
import com.influxdb.v3.client.InfluxDBApiException;
46-
import com.influxdb.v3.client.InfluxDBClient;
47-
import com.influxdb.v3.client.Point;
48-
import com.influxdb.v3.client.PointValues;
4948
import com.influxdb.v3.client.config.ClientConfig;
5049
import com.influxdb.v3.client.query.QueryOptions;
5150
import com.influxdb.v3.client.write.WriteOptions;
@@ -274,11 +273,27 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
274273

275274
WritePrecision precision = options.precisionSafe(config);
276275

277-
Map<String, String> queryParams = new HashMap<>() {{
278-
put("bucket", database);
279-
put("org", config.getOrganization());
280-
put("precision", precision.name().toLowerCase());
281-
}};
276+
String path;
277+
Map<String, String> queryParams;
278+
boolean noSync = options.noSyncSafe(config);
279+
if (noSync) {
280+
// Setting no_sync=true is supported only in the v3 API.
281+
path = "api/v3/write_lp";
282+
queryParams = new HashMap<>() {{
283+
put("org", config.getOrganization());
284+
put("db", database);
285+
put("precision", WritePrecisionConverter.toV3ApiString(precision));
286+
put("no_sync", "true");
287+
}};
288+
} else {
289+
// By default, use the v2 API.
290+
path = "api/v2/write";
291+
queryParams = new HashMap<>() {{
292+
put("org", config.getOrganization());
293+
put("bucket", database);
294+
put("precision", WritePrecisionConverter.toV2ApiString(precision));
295+
}};
296+
}
282297

283298
Map<String, String> defaultTags = options.defaultTagsSafe(config);
284299

@@ -314,7 +329,15 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
314329
}
315330
headers.putAll(options.headersSafe());
316331

317-
restClient.request("api/v2/write", HttpMethod.POST, body, queryParams, headers);
332+
try {
333+
restClient.request(path, HttpMethod.POST, body, queryParams, headers);
334+
} catch (InfluxDBApiHttpException e) {
335+
if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
336+
// Server does not support the v3 write API, can't use the NoSync option.
337+
throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true (supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode());
338+
}
339+
throw e;
340+
}
318341
}
319342

320343
@Nonnull

0 commit comments

Comments
 (0)