Skip to content

Commit 4a151d7

Browse files
committed
test: add test of FlightStream overhead.
1 parent 5142e6c commit 4a151d7

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ final class FlightSqlClient implements AutoCloseable {
7575

7676
private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class);
7777
private static final int AUTOCLOSEABLE_CHECK_LIMIT = 10;
78-
private static final Map<AutoCloseable, Boolean> CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>();
78+
static final Map<AutoCloseable, Boolean> CLOSEABLE_CLOSED_LEDGER = new ConcurrentHashMap<>();
7979

8080
private final FlightClient client;
8181

8282
private final Map<String, String> defaultHeaders = new HashMap<>();
8383
private final ObjectMapper objectMapper = new ObjectMapper();
84-
private final List<AutoCloseable> autoCloseables = new ArrayList<>();
84+
final List<AutoCloseable> autoCloseables = new ArrayList<>();
8585

8686
FlightSqlClient(@Nonnull final ClientConfig config) {
8787
this(config, null);
@@ -144,17 +144,17 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
144144
return StreamSupport.stream(spliterator, false).onClose(iterator::close);
145145
}
146146

147-
private void addToAutoCloseable(@Nonnull final AutoCloseable closeable) {
147+
private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closeable) {
148148
// need to occasionally clean up references to closed streams
149149
// in order to ensure memory can get freed.
150150
if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) {
151-
LOG.info("checking to cleanup stale flight streams from {} known streams", autoCloseables.size());
151+
LOG.debug("checking to cleanup stale flight streams from {} known streams", autoCloseables.size());
152152

153153
ListIterator<AutoCloseable> iter = autoCloseables.listIterator();
154154
while (iter.hasNext()) {
155155
AutoCloseable autoCloseable = iter.next();
156156
if (CLOSEABLE_CLOSED_LEDGER.get(autoCloseable)) {
157-
LOG.info("removing closed stream {}", autoCloseable);
157+
LOG.debug("removing closed stream {}", autoCloseable);
158158
CLOSEABLE_CLOSED_LEDGER.keySet().remove(autoCloseable);
159159
iter.remove();
160160
}

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ public void testMultipleQueries() throws Exception {
470470
client.writePoints(points);
471471
String query = "SELECT * FROM " + measurement;
472472

473-
// TODO just checking FlightStream cleanup should be test in FlightSqlClient
474473
for (int i = 0; i < 20; i++) {
475474
try (Stream<PointValues> stream = client.queryPoints(query)) {
476475
stream.forEach(pointValues -> {

src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,36 @@ void createProxyDetector() {
370370
}
371371
}
372372

373+
374+
@Test
375+
public void multipleFlightStreamsFreed() throws Exception {
376+
ClientConfig clientConfig = new ClientConfig.Builder()
377+
.host(server.getLocation().getUri().toString())
378+
.token("my-token".toCharArray())
379+
.build();
380+
381+
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) {
382+
383+
for (int i = 0; i < 20; i++) {
384+
Stream<VectorSchemaRoot> stream = flightSqlClient.execute(
385+
"select * from cpu",
386+
"mydb",
387+
QueryType.SQL,
388+
Map.of(),
389+
Map.of());
390+
391+
stream.forEach(VectorSchemaRoot::contentToTSVString);
392+
}
393+
Assertions.assertThat(flightSqlClient.autoCloseables.size()).isEqualTo(9);
394+
// N.B. can pick up references from other tests...
395+
Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.size()).isLessThan(20);
396+
for (AutoCloseable closeable : FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.keySet()) {
397+
Assertions.assertThat(FlightSqlClient.CLOSEABLE_CLOSED_LEDGER.get(closeable)).isTrue(); // Is closed
398+
}
399+
400+
}
401+
}
402+
373403
static class HeaderCaptureMiddleware implements FlightServerMiddleware {
374404

375405
private final Map<String, String> headers = new HashMap<>();
@@ -418,4 +448,5 @@ public HeaderCaptureMiddleware onCallStarted(final CallInfo callInfo,
418448
return lastInstance;
419449
}
420450
}
451+
421452
}

0 commit comments

Comments
 (0)