Skip to content

Commit cfd772a

Browse files
committed
fix: remove List<autocloseable> from FlightSqlClient and simplify closing FlightStream.
1 parent cda5fb7 commit cfd772a

File tree

2 files changed

+10
-99
lines changed

2 files changed

+10
-99
lines changed

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 10 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,11 @@
7373
final class FlightSqlClient implements AutoCloseable {
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(FlightSqlClient.class);
76-
static final int AUTOCLOSEABLE_CHECK_LIMIT = 10;
7776

7877
private final FlightClient client;
7978

8079
private final Map<String, String> defaultHeaders = new HashMap<>();
8180
private final ObjectMapper objectMapper = new ObjectMapper();
82-
final List<AutoCloseable> autoCloseables = new ArrayList<>();
8381

8482
FlightSqlClient(@Nonnull final ClientConfig config) {
8583
this(config, null);
@@ -134,44 +132,16 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
134132
CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption);
135133

136134
Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
137-
StatefulFlightStream stream = new StatefulFlightStream(client.getStream(ticket, callOptionArray));
135+
FlightStream stream = client.getStream(ticket, callOptionArray);
138136
FlightSqlIterator iterator = new FlightSqlIterator(stream);
139-
addToAutoCloseable(stream);
140137

141138
Spliterator<VectorSchemaRoot> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
142139
return StreamSupport.stream(spliterator, false).onClose(iterator::close);
143140
}
144141

145-
private synchronized void addToAutoCloseable(@Nonnull final AutoCloseable closeable) {
146-
// need to occasionally clean up references to closed streams
147-
// in order to ensure memory can get freed.
148-
if (autoCloseables.size() > AUTOCLOSEABLE_CHECK_LIMIT) {
149-
LOG.debug("checking to cleanup stale flight streams from {} known streams", autoCloseables.size());
150-
151-
cleanAutoCloseables();
152-
}
153-
154-
autoCloseables.add(closeable);
155-
LOG.debug("autoCloseables count {}", autoCloseables.size());
156-
}
157-
158-
public void cleanAutoCloseables() {
159-
ListIterator<AutoCloseable> iter = autoCloseables.listIterator();
160-
while (iter.hasNext()) {
161-
AutoCloseable autoCloseable = iter.next();
162-
if (autoCloseable.getClass() == FlightSqlClient.StatefulFlightStream.class) {
163-
if (((FlightSqlClient.StatefulFlightStream) autoCloseable).closed) {
164-
iter.remove();
165-
}
166-
}
167-
}
168-
}
169-
170142
@Override
171143
public void close() throws Exception {
172-
autoCloseables.add(client);
173-
AutoCloseables.close(autoCloseables);
174-
cleanAutoCloseables();
144+
client.close();
175145
}
176146

177147
@Nonnull
@@ -273,60 +243,36 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina
273243
};
274244
}
275245

276-
private static final class StatefulFlightStream implements AutoCloseable {
277-
FlightStream flightStream;
278-
Boolean closed;
279-
280-
public StatefulFlightStream(@Nonnull final FlightStream flightStream) {
281-
this.flightStream = flightStream;
282-
this.closed = false;
283-
}
284-
285-
@Override
286-
public void close() throws Exception {
287-
this.flightStream.close();
288-
this.closed = true;
289-
}
290-
}
291-
292246
private static final class FlightSqlIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
293247

294248
private final List<AutoCloseable> autoCloseable = new ArrayList<>();
295249

296-
private final StatefulFlightStream sFlightStream;
250+
private final FlightStream flightStream;
297251

298-
private FlightSqlIterator(@Nonnull final StatefulFlightStream sFlightStream) {
299-
this.sFlightStream = sFlightStream;
252+
private FlightSqlIterator(@Nonnull final FlightStream flightStream) {
253+
this.flightStream = flightStream;
300254
}
301255

302256
@Override
303257
public boolean hasNext() {
304-
boolean nextable = sFlightStream.flightStream.next();
305-
if (!nextable) {
306-
// Nothing left to read - close the stream
307-
try {
308-
sFlightStream.close();
309-
} catch (Exception e) {
310-
LOG.error("Error while closing FlightStream: ", e);
311-
}
312-
}
313-
return nextable;
258+
return flightStream.next();
314259
}
315260

316261
@Override
317262
public VectorSchemaRoot next() {
318-
if (sFlightStream.flightStream.getRoot() == null) {
263+
if (flightStream.getRoot() == null) {
319264
throw new NoSuchElementException();
320265
}
321266

322-
autoCloseable.add(sFlightStream.flightStream.getRoot());
267+
autoCloseable.add(flightStream.getRoot());
323268

324-
return sFlightStream.flightStream.getRoot();
269+
return flightStream.getRoot();
325270
}
326271

327272
@Override
328273
public void close() {
329274
try {
275+
flightStream.close();
330276
AutoCloseables.close(autoCloseable);
331277
} catch (Exception e) {
332278
throw new RuntimeException(e);

src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -372,41 +372,6 @@ void createProxyDetector() {
372372
}
373373
}
374374

375-
376-
@Test
377-
public void multipleFlightStreamsFreed() throws Exception {
378-
ClientConfig clientConfig = new ClientConfig.Builder()
379-
.host(server.getLocation().getUri().toString())
380-
.token("my-token".toCharArray())
381-
.build();
382-
383-
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) {
384-
385-
List<AutoCloseable> autoCloseables = new ArrayList<>();
386-
for (int i = 0; i < 20; i++) {
387-
Stream<VectorSchemaRoot> stream = flightSqlClient.execute(
388-
"select * from cpu",
389-
"mydb",
390-
QueryType.SQL,
391-
Map.of(),
392-
Map.of());
393-
394-
stream.forEach(VectorSchemaRoot::contentToTSVString);
395-
autoCloseables.add(flightSqlClient.autoCloseables.get(flightSqlClient.autoCloseables.size() - 1));
396-
397-
}
398-
Assertions.assertThat(flightSqlClient.autoCloseables.size()).isEqualTo(9);
399-
for (int i = 0; i < autoCloseables.size(); i++) {
400-
if (i < FlightSqlClient.AUTOCLOSEABLE_CHECK_LIMIT + 1) {
401-
Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isFalse();
402-
} else {
403-
Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue();
404-
Assertions.assertThat(flightSqlClient.autoCloseables.contains(autoCloseables.get(i))).isTrue();
405-
}
406-
}
407-
}
408-
}
409-
410375
static class HeaderCaptureMiddleware implements FlightServerMiddleware {
411376

412377
private final Map<String, String> headers = new HashMap<>();

0 commit comments

Comments
 (0)