Skip to content

Commit d1560cb

Browse files
fix: use WriteOptions from ClientConfig on write (#239)
* feat: fast no-sync write support * lint: fix import order * lint: fix lint issues * lint: fix lint issues * lint: add missing license headers * docs: update CHANGELOG.md * fix: use WriteOptions from ClientConfig on write * lint: fix lint errors * lint: fix lint errors * docs: update CHANGELOG.md * chore: revert unwanted whitespace changes * lint: fix lint errors * test: add skip new integration test if ENVARS are not defined. --------- Co-authored-by: karel rehor <[email protected]>
1 parent 1f1ac3f commit d1560cb

File tree

4 files changed

+189
-9
lines changed

4 files changed

+189
-9
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,18 @@
1414
`"second"` (
1515
in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`).
1616

17+
### Bug Fixes
18+
19+
1. [#239](https://github.com/InfluxCommunity/influxdb3-java/pull/239): Use write options from `ClientConfig` in
20+
`InfluxDBClientImpl` write methods:
21+
22+
```java
23+
public void writeRecord(@Nullable final String record);
24+
public void writeRecords(@Nonnull final List<String> records);
25+
public void writePoint(@Nullable final Point point);
26+
public void writePoints(@Nonnull final List<Point> points);
27+
```
28+
1729
## 1.1.0 [2025-05-22]
1830

1931
### Features

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
8181

8282
private final RestClient restClient;
8383
private final FlightSqlClient flightSqlClient;
84+
private final WriteOptions emptyWriteOptions;
8485

8586
/**
8687
* Creates an instance using the specified config.
@@ -110,11 +111,12 @@ public InfluxDBClientImpl(@Nonnull final ClientConfig config) {
110111
this.config = config;
111112
this.restClient = restClient != null ? restClient : new RestClient(config);
112113
this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config);
114+
this.emptyWriteOptions = new WriteOptions(null);
113115
}
114116

115117
@Override
116118
public void writeRecord(@Nullable final String record) {
117-
writeRecord(record, WriteOptions.DEFAULTS);
119+
writeRecord(record, emptyWriteOptions);
118120
}
119121

120122
@Override
@@ -128,7 +130,7 @@ public void writeRecord(@Nullable final String record, @Nonnull final WriteOptio
128130

129131
@Override
130132
public void writeRecords(@Nonnull final List<String> records) {
131-
writeRecords(records, WriteOptions.DEFAULTS);
133+
writeRecords(records, emptyWriteOptions);
132134
}
133135

134136
@Override
@@ -138,7 +140,7 @@ public void writeRecords(@Nonnull final List<String> records, @Nonnull final Wri
138140

139141
@Override
140142
public void writePoint(@Nullable final Point point) {
141-
writePoint(point, WriteOptions.DEFAULTS);
143+
writePoint(point, emptyWriteOptions);
142144
}
143145

144146
@Override
@@ -152,7 +154,7 @@ public void writePoint(@Nullable final Point point, @Nonnull final WriteOptions
152154

153155
@Override
154156
public void writePoints(@Nonnull final List<Point> points) {
155-
writePoints(points, WriteOptions.DEFAULTS);
157+
writePoints(points, emptyWriteOptions);
156158
}
157159

158160
@Override

src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java

Lines changed: 168 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323

2424
import java.util.Arrays;
2525
import java.util.Collections;
26+
import java.util.List;
2627
import java.util.Map;
2728

2829
import io.netty.handler.codec.http.HttpResponseStatus;
30+
import okhttp3.HttpUrl;
2931
import okhttp3.mockwebserver.RecordedRequest;
3032
import org.assertj.core.api.Assertions;
33+
import org.jetbrains.annotations.NotNull;
3134
import org.junit.jupiter.api.AfterEach;
3235
import org.junit.jupiter.api.BeforeEach;
3336
import org.junit.jupiter.api.Test;
3437

38+
import com.influxdb.v3.client.config.ClientConfig;
3539
import com.influxdb.v3.client.write.WriteOptions;
3640
import com.influxdb.v3.client.write.WritePrecision;
3741

@@ -223,10 +227,8 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
223227
mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code()));
224228

225229
InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class,
226-
() -> {
227-
client.writeRecord("mem,tag=one value=1.0",
228-
new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build());
229-
}
230+
() -> client.writeRecord("mem,tag=one value=1.0",
231+
new WriteOptions.Builder().precision(WritePrecision.MS).noSync(true).build())
230232
);
231233

232234
assertThat(mockServer.getRequestCount()).isEqualTo(1);
@@ -242,6 +244,167 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
242244
+ " (supported by InfluxDB 3 Core/Enterprise servers only).");
243245
}
244246

247+
@Test
248+
void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception {
249+
mockServer.enqueue(createResponse(200));
250+
251+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
252+
.build();
253+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
254+
client.writeRecord("mem,tag=one value=1.0");
255+
}
256+
257+
checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
258+
}
259+
260+
@Test
261+
void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception {
262+
mockServer.enqueue(createResponse(200));
263+
264+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
265+
.writePrecision(WritePrecision.S)
266+
.writeNoSync(true)
267+
.gzipThreshold(1)
268+
.build();
269+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
270+
client.writeRecord("mem,tag=one value=1.0");
271+
}
272+
273+
checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
274+
}
275+
276+
@Test
277+
void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception {
278+
mockServer.enqueue(createResponse(200));
279+
280+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
281+
.build();
282+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
283+
client.writeRecords(List.of("mem,tag=one value=1.0"));
284+
}
285+
286+
checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
287+
}
288+
289+
@Test
290+
void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception {
291+
mockServer.enqueue(createResponse(200));
292+
293+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
294+
.writePrecision(WritePrecision.S)
295+
.writeNoSync(true)
296+
.gzipThreshold(1)
297+
.build();
298+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
299+
client.writeRecords(List.of("mem,tag=one value=1.0"));
300+
}
301+
302+
checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
303+
}
304+
305+
@Test
306+
void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception {
307+
mockServer.enqueue(createResponse(200));
308+
309+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
310+
.build();
311+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
312+
Point point = new Point("mem");
313+
point.setTag("tag", "one");
314+
point.setField("value", 1.0);
315+
client.writePoint(point);
316+
}
317+
318+
checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
319+
}
320+
321+
@Test
322+
void writePointWithDefaultWriteOptionsCustomConfig() throws Exception {
323+
mockServer.enqueue(createResponse(200));
324+
325+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
326+
.writePrecision(WritePrecision.S)
327+
.writeNoSync(true)
328+
.gzipThreshold(1)
329+
.build();
330+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
331+
Point point = new Point("mem");
332+
point.setTag("tag", "one");
333+
point.setField("value", 1.0);
334+
client.writePoint(point);
335+
}
336+
337+
checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
338+
}
339+
340+
@Test
341+
void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception {
342+
mockServer.enqueue(createResponse(200));
343+
344+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
345+
.build();
346+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
347+
Point point = new Point("mem");
348+
point.setTag("tag", "one");
349+
point.setField("value", 1.0);
350+
client.writePoints(List.of(point));
351+
}
352+
353+
checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
354+
}
355+
356+
@Test
357+
void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception {
358+
mockServer.enqueue(createResponse(200));
359+
360+
ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
361+
.writePrecision(WritePrecision.S)
362+
.writeNoSync(true)
363+
.gzipThreshold(1)
364+
.build();
365+
try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
366+
Point point = new Point("mem");
367+
point.setTag("tag", "one");
368+
point.setField("value", 1.0);
369+
client.writePoints(List.of(point));
370+
}
371+
372+
checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
373+
}
374+
375+
private void checkWriteCalled(final String expectedPath, final String expectedDB,
376+
final String expectedPrecision, final boolean expectedNoSync,
377+
final boolean expectedGzip) throws InterruptedException {
378+
RecordedRequest request = assertThatServerRequested();
379+
HttpUrl requestUrl = request.getRequestUrl();
380+
assertThat(requestUrl).isNotNull();
381+
assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath);
382+
if (expectedNoSync) {
383+
assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB);
384+
} else {
385+
assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB);
386+
}
387+
assertThat(requestUrl.queryParameter("precision")).isEqualTo(expectedPrecision);
388+
if (expectedNoSync) {
389+
assertThat(requestUrl.queryParameter("no_sync")).isEqualTo("true");
390+
} else {
391+
assertThat(requestUrl.queryParameter("no_sync")).isNull();
392+
}
393+
if (expectedGzip) {
394+
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
395+
} else {
396+
assertThat(request.getHeader("Content-Encoding")).isNull();
397+
}
398+
}
399+
400+
@NotNull
401+
private RecordedRequest assertThatServerRequested() throws InterruptedException {
402+
assertThat(mockServer.getRequestCount()).isEqualTo(1);
403+
RecordedRequest request = mockServer.takeRequest();
404+
assertThat(request).isNotNull();
405+
return request;
406+
}
407+
245408
@Test
246409
void allParameterSpecified() throws InterruptedException {
247410
mockServer.enqueue(createResponse(200));
@@ -344,7 +507,7 @@ void defaultTags() throws InterruptedException {
344507
}
345508

346509
@Test
347-
public void retryHandled429Test() throws InterruptedException {
510+
public void retryHandled429Test() {
348511
mockServer.enqueue(createResponse(429)
349512
.setBody("{ \"message\" : \"Too Many Requests\" }")
350513
.setHeader("retry-after", "42")

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,9 @@ public void testQueryRows() throws Exception {
278278
}
279279
}
280280

281+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
282+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
283+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
281284
@Test
282285
public void testQueryRowWithOptions() throws Exception {
283286
try (InfluxDBClient client = InfluxDBClient.getInstance(

0 commit comments

Comments
 (0)