Skip to content

Commit 1f1ac3f

Browse files
authored
feat: fast no-sync write support (#238)
1 parent 069e805 commit 1f1ac3f

File tree

11 files changed

+526
-81
lines changed

11 files changed

+526
-81
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,16 @@
33
### Features
44

55
1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add query function returning row as map.
6+
1. [#238](https://github.com/InfluxCommunity/influxdb3-java/pull/238): Support fast writes without waiting for WAL
7+
persistence:
8+
- New write option (`WriteOptions.noSync`) added: `true` value means faster write but without the confirmation that
9+
the data was persisted. Default value: `false`.
10+
- **Supported by self-managed InfluxDB 3 Core and Enterprise servers only!**
11+
- Also configurable via connection string query parameter (`writeNoSync`).
12+
- Also configurable via environment variable (`INFLUX_WRITE_NO_SYNC`).
13+
- Long precision string values added from v3 HTTP API: `"nanosecond"`, `"microsecond"`, `"millisecond"`,
14+
`"second"` (
15+
in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`).
616

717
## 1.1.0 [2025-05-22]
818

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
526526
* <li>database - database (bucket) name</li>
527527
* <li>precision - timestamp precision when writing data</li>
528528
* <li>gzipThreshold - payload size size for gzipping data</li>
529+
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
529530
* </ul>
530531
*
531532
* @param connectionString connection string
@@ -558,6 +559,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
558559
* <li>INFLUX_DATABASE - database (bucket) name</li>
559560
* <li>INFLUX_PRECISION - timestamp precision when writing data</li>
560561
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
562+
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
561563
* </ul>
562564
* Supported system properties:
563565
* <ul>
@@ -568,6 +570,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
568570
* <li>influx.database - database (bucket) name</li>
569571
* <li>influx.precision - timestamp precision when writing data</li>
570572
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
573+
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
571574
* </ul>
572575
*
573576
* @return instance of {@link InfluxDBClient}

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import javax.annotation.Nonnull;
3737
import javax.annotation.Nullable;
3838

39+
import com.influxdb.v3.client.write.WriteOptions;
3940
import com.influxdb.v3.client.write.WritePrecision;
4041

4142
/**
@@ -52,6 +53,7 @@
5253
* <li><code>writePrecision</code> - precision to use when writing points to InfluxDB</li>
5354
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
5455
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
56+
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
5557
* <li><code>responseTimeout</code> - timeout when connecting to InfluxDB</li>
5658
* <li><code>allowHttpRedirects</code> - allow redirects for InfluxDB connections</li>
5759
* <li><code>disableServerCertificateValidation</code> -
@@ -73,6 +75,7 @@
7375
* .database("my-database")
7476
* .writePrecision(WritePrecision.S)
7577
* .gzipThreshold(4096)
78+
* .writeNoSync(true)
7679
* .proxyUrl("http://localhost:10000")
7780
* .build();
7881
*
@@ -94,6 +97,7 @@ public final class ClientConfig {
9497
private final String database;
9598
private final WritePrecision writePrecision;
9699
private final Integer gzipThreshold;
100+
private final Boolean writeNoSync;
97101
private final Map<String, String> defaultTags;
98102
private final Duration timeout;
99103
private final Boolean allowHttpRedirects;
@@ -180,6 +184,16 @@ public Integer getGzipThreshold() {
180184
return gzipThreshold;
181185
}
182186

187+
/**
188+
* Skip waiting for WAL persistence on write?
189+
*
190+
* @return skip waiting for WAL persistence on write
191+
*/
192+
@Nonnull
193+
public Boolean getWriteNoSync() {
194+
return writeNoSync;
195+
}
196+
183197
/**
184198
* Gets default tags used when writing points.
185199
* @return default tags
@@ -295,6 +309,7 @@ public boolean equals(final Object o) {
295309
&& Objects.equals(database, that.database)
296310
&& writePrecision == that.writePrecision
297311
&& Objects.equals(gzipThreshold, that.gzipThreshold)
312+
&& Objects.equals(writeNoSync, that.writeNoSync)
298313
&& Objects.equals(defaultTags, that.defaultTags)
299314
&& Objects.equals(timeout, that.timeout)
300315
&& Objects.equals(allowHttpRedirects, that.allowHttpRedirects)
@@ -309,7 +324,7 @@ public boolean equals(final Object o) {
309324
@Override
310325
public int hashCode() {
311326
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
312-
database, writePrecision, gzipThreshold,
327+
database, writePrecision, gzipThreshold, writeNoSync,
313328
timeout, allowHttpRedirects, disableServerCertificateValidation,
314329
proxy, proxyUrl, authenticator, headers,
315330
defaultTags, sslRootsFilePath);
@@ -323,6 +338,7 @@ public String toString() {
323338
.add("database='" + database + "'")
324339
.add("writePrecision=" + writePrecision)
325340
.add("gzipThreshold=" + gzipThreshold)
341+
.add("writeNoSync=" + writeNoSync)
326342
.add("timeout=" + timeout)
327343
.add("allowHttpRedirects=" + allowHttpRedirects)
328344
.add("disableServerCertificateValidation=" + disableServerCertificateValidation)
@@ -348,6 +364,7 @@ public static final class Builder {
348364
private String database;
349365
private WritePrecision writePrecision;
350366
private Integer gzipThreshold;
367+
private Boolean writeNoSync;
351368
private Map<String, String> defaultTags;
352369
private Duration timeout;
353370
private Boolean allowHttpRedirects;
@@ -452,6 +469,19 @@ public Builder gzipThreshold(@Nullable final Integer gzipThreshold) {
452469
return this;
453470
}
454471

472+
/**
473+
* Sets whether to skip waiting for WAL persistence on write.
474+
*
475+
* @param writeNoSync skip waiting for WAL persistence on write
476+
* @return this
477+
*/
478+
@Nonnull
479+
public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
480+
481+
this.writeNoSync = writeNoSync;
482+
return this;
483+
}
484+
455485
/**
456486
* Sets default tags to be written with points.
457487
*
@@ -634,6 +664,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
634664
if (parameters.containsKey("gzipThreshold")) {
635665
this.gzipThreshold(Integer.parseInt(parameters.get("gzipThreshold")));
636666
}
667+
if (parameters.containsKey("writeNoSync")) {
668+
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
669+
}
637670

638671
return new ClientConfig(this);
639672
}
@@ -682,6 +715,10 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
682715
if (gzipThreshold != null) {
683716
this.gzipThreshold(Integer.parseInt(gzipThreshold));
684717
}
718+
final String writeNoSync = get.apply("INFLUX_WRITE_NO_SYNC", "influx.writeNoSync");
719+
if (writeNoSync != null) {
720+
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
721+
}
685722

686723
return new ClientConfig(this);
687724
}
@@ -690,15 +727,19 @@ private WritePrecision parsePrecision(@Nonnull final String precision) {
690727
WritePrecision writePrecision;
691728
switch (precision) {
692729
case "ns":
730+
case "nanosecond":
693731
writePrecision = WritePrecision.NS;
694732
break;
695733
case "us":
734+
case "microsecond":
696735
writePrecision = WritePrecision.US;
697736
break;
698737
case "ms":
738+
case "millisecond":
699739
writePrecision = WritePrecision.MS;
700740
break;
701741
case "s":
742+
case "second":
702743
writePrecision = WritePrecision.S;
703744
break;
704745
default:
@@ -716,8 +757,9 @@ private ClientConfig(@Nonnull final Builder builder) {
716757
authScheme = builder.authScheme;
717758
organization = builder.organization;
718759
database = builder.database;
719-
writePrecision = builder.writePrecision != null ? builder.writePrecision : WritePrecision.NS;
720-
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : 1000;
760+
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
761+
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
762+
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
721763
defaultTags = builder.defaultTags;
722764
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(10);
723765
allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false;

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,21 @@
3838
import javax.annotation.Nullable;
3939

4040
import io.netty.handler.codec.http.HttpMethod;
41+
import io.netty.handler.codec.http.HttpResponseStatus;
4142
import org.apache.arrow.flight.CallOption;
4243
import org.apache.arrow.vector.FieldVector;
4344
import org.apache.arrow.vector.VectorSchemaRoot;
4445

4546
import com.influxdb.v3.client.InfluxDBApiException;
47+
import com.influxdb.v3.client.InfluxDBApiHttpException;
4648
import com.influxdb.v3.client.InfluxDBClient;
4749
import com.influxdb.v3.client.Point;
4850
import com.influxdb.v3.client.PointValues;
4951
import com.influxdb.v3.client.config.ClientConfig;
5052
import com.influxdb.v3.client.query.QueryOptions;
5153
import com.influxdb.v3.client.write.WriteOptions;
5254
import com.influxdb.v3.client.write.WritePrecision;
55+
import com.influxdb.v3.client.write.WritePrecisionConverter;
5356

5457
/**
5558
* Implementation of the InfluxDBClient. It is thread-safe and can be safely shared between threads.
@@ -309,11 +312,27 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
309312

310313
WritePrecision precision = options.precisionSafe(config);
311314

312-
Map<String, String> queryParams = new HashMap<>() {{
313-
put("bucket", database);
314-
put("org", config.getOrganization());
315-
put("precision", precision.name().toLowerCase());
316-
}};
315+
String path;
316+
Map<String, String> queryParams;
317+
boolean noSync = options.noSyncSafe(config);
318+
if (noSync) {
319+
// Setting no_sync=true is supported only in the v3 API.
320+
path = "api/v3/write_lp";
321+
queryParams = new HashMap<>() {{
322+
put("org", config.getOrganization());
323+
put("db", database);
324+
put("precision", WritePrecisionConverter.toV3ApiString(precision));
325+
put("no_sync", "true");
326+
}};
327+
} else {
328+
// By default, use the v2 API.
329+
path = "api/v2/write";
330+
queryParams = new HashMap<>() {{
331+
put("org", config.getOrganization());
332+
put("bucket", database);
333+
put("precision", WritePrecisionConverter.toV2ApiString(precision));
334+
}};
335+
}
317336

318337
Map<String, String> defaultTags = options.defaultTagsSafe(config);
319338

@@ -349,7 +368,16 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
349368
}
350369
headers.putAll(options.headersSafe());
351370

352-
restClient.request("api/v2/write", HttpMethod.POST, body, queryParams, headers);
371+
try {
372+
restClient.request(path, HttpMethod.POST, body, queryParams, headers);
373+
} catch (InfluxDBApiHttpException e) {
374+
if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
375+
// Server does not support the v3 write API, can't use the NoSync option.
376+
throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true "
377+
+ "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode());
378+
}
379+
throw e;
380+
}
353381
}
354382

355383
@Nonnull

0 commit comments

Comments
 (0)