Skip to content

Commit ce5da13

Browse files
committed
chore: free references to unused FlightStreams to help GC
1 parent 5b9ce2b commit ce5da13

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import java.util.HashMap;
3131
import java.util.Iterator;
3232
import java.util.List;
33+
import java.util.ListIterator;
3334
import java.util.Map;
3435
import java.util.NoSuchElementException;
3536
import java.util.Spliterator;
3637
import java.util.Spliterators;
38+
import java.util.concurrent.ConcurrentHashMap;
3739
import java.util.stream.Stream;
3840
import java.util.stream.StreamSupport;
3941
import javax.annotation.Nonnull;
@@ -72,6 +74,8 @@
7274
final class FlightSqlClient implements AutoCloseable {
7375

7476
private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class);
77+
private static int AUTOCLOSEABLE_CHECK_LIMIT = 10;
78+
private static Map<AutoCloseable, Boolean> CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>();
7579

7680
private final FlightClient client;
7781

@@ -134,12 +138,34 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
134138
Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
135139
FlightStream stream = client.getStream(ticket, callOptionArray);
136140
FlightSqlIterator iterator = new FlightSqlIterator(stream);
137-
autoCloseables.add(stream);
141+
addToAutoCloseable(stream);
138142

139143
Spliterator<VectorSchemaRoot> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
140144
return StreamSupport.stream(spliterator, false).onClose(iterator::close);
141145
}
142146

147+
private void addToAutoCloseable(AutoCloseable closeable) {
148+
// need to occasionally clean up references to closed streams
149+
// in order to ensure memory can get freed.
150+
if(autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) {
151+
LOG.info("checking to cleanup stale flight streams from {} known streams", autoCloseables.size());
152+
153+
ListIterator<AutoCloseable> iter = autoCloseables.listIterator();
154+
while(iter.hasNext()){
155+
AutoCloseable autoCloseable = iter.next();
156+
if(CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)){
157+
LOG.info("removing closed stream {}", autoCloseable);
158+
CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable);
159+
iter.remove();
160+
}
161+
}
162+
}
163+
164+
autoCloseables.add(closeable);
165+
CLOSEABLE_CLOSED_LEDGER.put(closeable, false);
166+
LOG.debug("autoCloseables count {}, LEDGER count {}", autoCloseables.size(), CLOSEABLE_CLOSED_LEDGER.size());
167+
}
168+
143169
@Override
144170
public void close() throws Exception {
145171
autoCloseables.add(client);
@@ -262,6 +288,7 @@ public boolean hasNext() {
262288
// Nothing left to read - close the stream
263289
try {
264290
flightStream.close();
291+
CLOSEABLE_CLOSED_LEDGER.replace(flightStream, true);
265292
} catch (Exception e) {
266293
LOG.error("Error while closing FlightStream: ", e);
267294
}

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,55 @@ public void testNoAllocatorMemoryLeak() {
435435

436436
}
437437

438+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
439+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
440+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
441+
@Test
442+
public void testMultipleQueries() throws Exception {
443+
444+
Instant now = Instant.now();
445+
String measurement = "test_" + now.toEpochMilli() % 1000;
446+
447+
try (InfluxDBClient client = InfluxDBClient.getInstance(
448+
System.getenv("TESTING_INFLUXDB_URL"),
449+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
450+
System.getenv("TESTING_INFLUXDB_DATABASE"),
451+
null)) {
452+
453+
List<Point> points = List.of(
454+
Point.measurement(measurement)
455+
.setTag("type", "test")
456+
.setFloatField("rads", 3.14)
457+
.setIntegerField("life", 42)
458+
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
459+
Point.measurement(measurement)
460+
.setTag("type", "test")
461+
.setFloatField("rads", 3.14)
462+
.setIntegerField("life", 42)
463+
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
464+
Point.measurement(measurement)
465+
.setTag("type", "test")
466+
.setFloatField("rads", 3.14)
467+
.setIntegerField("life", 42)
468+
.setTimestamp(now));
469+
470+
client.writePoints(points);
471+
String query = "SELECT * FROM " + measurement;
472+
473+
// TODO just checking FlightStream cleanup should be test in FlightSqlClient
474+
for(int i = 0; i < 20; i++) {
475+
try (Stream<PointValues> stream = client.queryPoints(query)) {
476+
stream.forEach(pointValues -> {
477+
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
478+
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
479+
});
480+
}
481+
}
482+
}
483+
484+
485+
}
486+
438487

439488
private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) {
440489
influxDBClient.writePoint(

0 commit comments

Comments
 (0)