Skip to content

Commit 6cf278f

Browse files
authored
fix: arrow BaseAllocator is reporting memory leaks (#306)
* fix: (WIP) ensure FlightStream is closed properly to avoid BaseAllocator memory leaks. * chore: remove unused imports in E2E test * fix: additional workaround for closing FlightStream and test update. * chore: free references to unused FlightStreams to help GC * chore: clean up lint issues * test: add test of FlightStream overhead. * test: fix flakey test for FlightStream overhead. * docs: update CHANGELOG.md * chore: cleanup flightStream ledger of FlightSqlClient close. * test: suppress assertion that makes test flakey in CI. * chore: remove static LEDGER field from FlightSqlClient * fix: remove List<autocloseable> from FlightSqlClient and simplify closing FlightStream. * chore: fix lint issues * docs: remove no-longer pertinent information from CHANGELOG.md * tests: restore logging in E2E test. * chore: attempt SNAPSHOT publish * chore: revert SNAPSHOT version and CI config - snapshot was deployed.
1 parent 240309c commit 6cf278f

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.6.0 [unreleased]
22

3+
### Features
4+
5+
1. [#306](https://github.com/InfluxCommunity/influxdb3-java/pull/306): Improve closing of Arrow `FlightStream`.
6+
37
## 1.5.0 [2025-10-22]
48

59
### Features

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ public VectorSchemaRoot next() {
271271
@Override
272272
public void close() {
273273
try {
274+
flightStream.close();
274275
AutoCloseables.close(autoCloseable);
275276
} catch (Exception e) {
276277
throw new RuntimeException(e);

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.net.URL;
2727
import java.net.URLConnection;
2828
import java.time.Instant;
29+
import java.time.temporal.ChronoUnit;
2930
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Map;
@@ -52,7 +53,7 @@
5253

5354
public class E2ETest {
5455

55-
private static final java.util.logging.Logger LOG = Logger.getLogger(E2ETest.class.getName());
56+
private static final Logger LOG = Logger.getLogger(E2ETest.class.getName());
5657

5758
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
5859
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@@ -381,6 +382,109 @@ public void testGetServerVersion() throws Exception {
381382
}
382383
}
383384

385+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
386+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
387+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
388+
@Test
389+
public void testNoAllocatorMemoryLeak() {
390+
391+
Instant now = Instant.now();
392+
String measurement = "test_" + now.toEpochMilli() % 1000;
393+
394+
Assertions.assertThatNoException().isThrownBy(() -> {
395+
396+
try (InfluxDBClient client = InfluxDBClient.getInstance(
397+
System.getenv("TESTING_INFLUXDB_URL"),
398+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
399+
System.getenv("TESTING_INFLUXDB_DATABASE"),
400+
null)) {
401+
402+
List<Point> points = List.of(
403+
Point.measurement(measurement)
404+
.setTag("type", "test")
405+
.setFloatField("rads", 3.14)
406+
.setIntegerField("life", 42)
407+
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
408+
Point.measurement(measurement)
409+
.setTag("type", "test")
410+
.setFloatField("rads", 3.14)
411+
.setIntegerField("life", 42)
412+
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
413+
Point.measurement(measurement)
414+
.setTag("type", "test")
415+
.setFloatField("rads", 3.14)
416+
.setIntegerField("life", 42)
417+
.setTimestamp(now));
418+
419+
client.writePoints(points);
420+
String query = "SELECT * FROM " + measurement;
421+
422+
try (Stream<PointValues> stream = client.queryPoints(query)) {
423+
// N.B. with other items remaining in the stream, an unclosed FlightStream
424+
// will not clean up the residual items and will cause the BaseAllocator
425+
// to throw an IllegalStateException: Memory was leaked...
426+
// Test to ensure FlightStream was closed even though two more records
427+
// remain in the stream
428+
stream.findFirst()
429+
.ifPresent(pointValues -> {
430+
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
431+
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
432+
});
433+
}
434+
}
435+
});
436+
437+
}
438+
439+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
440+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
441+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
442+
@Test
443+
public void testMultipleQueries() throws Exception {
444+
445+
Instant now = Instant.now();
446+
String measurement = "test_" + now.toEpochMilli() % 1000;
447+
448+
try (InfluxDBClient client = InfluxDBClient.getInstance(
449+
System.getenv("TESTING_INFLUXDB_URL"),
450+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
451+
System.getenv("TESTING_INFLUXDB_DATABASE"),
452+
null)) {
453+
454+
List<Point> points = List.of(
455+
Point.measurement(measurement)
456+
.setTag("type", "test")
457+
.setFloatField("rads", 3.14)
458+
.setIntegerField("life", 42)
459+
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
460+
Point.measurement(measurement)
461+
.setTag("type", "test")
462+
.setFloatField("rads", 3.14)
463+
.setIntegerField("life", 42)
464+
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
465+
Point.measurement(measurement)
466+
.setTag("type", "test")
467+
.setFloatField("rads", 3.14)
468+
.setIntegerField("life", 42)
469+
.setTimestamp(now));
470+
471+
client.writePoints(points);
472+
String query = "SELECT * FROM " + measurement;
473+
474+
for (int i = 0; i < 20; i++) {
475+
try (Stream<PointValues> stream = client.queryPoints(query)) {
476+
stream.forEach(pointValues -> {
477+
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
478+
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
479+
});
480+
}
481+
}
482+
}
483+
484+
485+
}
486+
487+
384488
private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) {
385489
influxDBClient.writePoint(
386490
Point.measurement("test1")

0 commit comments

Comments
 (0)