Skip to content

Commit 7692c3b

Browse files
authored
fix: possible memory leak in query stream close (#317)
1 parent ddb8948 commit 7692c3b

File tree

3 files changed

+211
-1
lines changed

3 files changed

+211
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.7.0 [unreleased]
22

3+
### Bug Fixes
4+
5+
1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts.
6+
37
## 1.6.0 [2025-11-14]
48

59
### Features

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,43 @@ public VectorSchemaRoot next() {
270270

271271
@Override
272272
public void close() {
273+
Exception pendingException = null;
274+
275+
// Try to close FlightStream
273276
try {
274277
flightStream.close();
278+
} catch (Exception e) {
279+
LOG.warn("FlightStream close failed: {}", e.toString());
280+
pendingException = e;
281+
282+
// Retry close - first attempt drained stream but threw exception before cleanup,
283+
// retry finds stream already drained and completes cleanup successfully
284+
try {
285+
flightStream.close();
286+
// Retry succeeded - clear the exception
287+
pendingException = null;
288+
} catch (Exception retryException) {
289+
// Retry also failed - keep original exception
290+
// but continue to close collected Arrow resources anyway
291+
LOG.error("FlightStream close failed even after retry attempt", retryException);
292+
}
293+
}
294+
295+
// ALWAYS try to close collected Arrow resources
296+
try {
275297
AutoCloseables.close(autoCloseable);
276298
} catch (Exception e) {
277-
throw new RuntimeException(e);
299+
LOG.error("AutoCloseable close failed", e);
300+
if (pendingException != null) {
301+
pendingException.addSuppressed(e);
302+
} else {
303+
pendingException = e;
304+
}
305+
}
306+
307+
// Throw pending exceptions
308+
if (pendingException != null) {
309+
throw new RuntimeException(pendingException);
278310
}
279311
}
280312
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client.issues;
23+
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.logging.Logger;
28+
import java.util.stream.Stream;
29+
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
33+
34+
import com.influxdb.v3.client.InfluxDBClient;
35+
import com.influxdb.v3.client.PointValues;
36+
import com.influxdb.v3.client.config.ClientConfig;
37+
38+
public class MemoryLeakIssueTest {
39+
40+
private static final Logger LOG = Logger.getLogger(MemoryLeakIssueTest.class.getName());
41+
42+
/**
43+
* Tests that interrupting a thread during stream consumption does not cause Arrow memory leaks.
44+
* <p>
45+
* This test creates a query thread that slowly consumes results, then interrupts it mid-processing.
46+
* The interrupt causes FlightStream.close() to throw InterruptedException, which previously bypassed
47+
* cleanup code and left Apache Arrow buffers unreleased. With the fix, client.close() should complete
48+
* successfully without throwing "Memory was leaked" errors.
49+
*/
50+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
51+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
52+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
53+
@Test
54+
void testStreamCloseWithThreadInterrupt() throws Exception {
55+
String host = System.getenv("TESTING_INFLUXDB_URL");
56+
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
57+
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
58+
String measurement = "memory_leak_test_" + System.currentTimeMillis();
59+
String sql = String.format("SELECT * FROM %s", measurement);
60+
61+
// Prepare config
62+
ClientConfig config = new ClientConfig.Builder()
63+
.host(host)
64+
.token(token.toCharArray())
65+
.database(database)
66+
.writeNoSync(true)
67+
.build();
68+
69+
try (InfluxDBClient client = InfluxDBClient.getInstance(config)) {
70+
// Write test data
71+
LOG.info("Writing test data...");
72+
for (int i = 0; i < 3; i++) {
73+
client.writeRecord(String.format("%s,id=%04d temp=%f",
74+
measurement, i, 20.0 + Math.random() * 10));
75+
}
76+
77+
// Wait for data to be queryable (CI environments can be slower)
78+
LOG.info("Waiting for data to be available...");
79+
int attempts = 0;
80+
boolean hasData = false;
81+
while (attempts < 10 && !hasData) {
82+
try (Stream<PointValues> testStream = client.queryPoints(sql)) {
83+
hasData = testStream.findFirst().isPresent();
84+
}
85+
if (!hasData) {
86+
LOG.info("Data not yet available, waiting... (attempt " + (attempts + 1) + "/10)");
87+
TimeUnit.MILLISECONDS.sleep(500);
88+
attempts++;
89+
}
90+
}
91+
92+
if (!hasData) {
93+
Assertions.fail("No data available after writing and waiting " + (attempts * 500) + "ms");
94+
}
95+
LOG.info("Data is available, starting test...");
96+
97+
}
98+
99+
// Query data
100+
InfluxDBClient client = InfluxDBClient.getInstance(config);
101+
//noinspection TryFinallyCanBeTryWithResources
102+
try {
103+
// Synchronization to ensure we interrupt during consumption
104+
CountDownLatch consumingStarted = new CountDownLatch(1);
105+
AtomicInteger rowsProcessed = new AtomicInteger(0);
106+
AtomicInteger exceptionsThrown = new AtomicInteger(0);
107+
108+
Thread queryThread = new Thread(() -> {
109+
try (Stream<PointValues> stream = client.queryPoints(sql)) {
110+
LOG.info("queryPoints returned");
111+
stream.forEach(pv -> {
112+
int count = rowsProcessed.incrementAndGet();
113+
114+
// Signal that we've started consuming
115+
if (count == 1) {
116+
LOG.info("Started consuming - ready for interrupt");
117+
consumingStarted.countDown();
118+
}
119+
120+
try {
121+
// Slow consumption to ensure we're mid-stream when interrupted
122+
TimeUnit.MILLISECONDS.sleep(100);
123+
} catch (InterruptedException e) {
124+
LOG.info("INTERRUPTED during consume! (after " + count + " rows)");
125+
Thread.currentThread().interrupt();
126+
// Throw exception to stop stream consumption immediately; try-with-resources will then
127+
// close stream in interrupted state
128+
throw new RuntimeException("Interrupted", e);
129+
}
130+
});
131+
} catch (Exception e) {
132+
exceptionsThrown.incrementAndGet();
133+
LOG.info("Exception caught: " + e);
134+
}
135+
});
136+
137+
queryThread.start();
138+
139+
// Wait for thread to start consuming
140+
if (!consumingStarted.await(10, TimeUnit.SECONDS)) {
141+
Assertions.fail("Thread didn't start consuming in time!");
142+
}
143+
144+
// Give it a moment to be mid-processing
145+
TimeUnit.MILLISECONDS.sleep(50);
146+
147+
// Interrupt during processing
148+
LOG.info("Interrupting thread...");
149+
queryThread.interrupt();
150+
151+
// Wait for thread to finish
152+
queryThread.join(10000);
153+
154+
// Verify that thread started processing rows
155+
if (rowsProcessed.get() == 0) {
156+
Assertions.fail("Thread didn't process any rows");
157+
}
158+
159+
// Verify that exception was thrown due to interrupt
160+
if (exceptionsThrown.get() == 0) {
161+
Assertions.fail("No exception was thrown - interrupt might not have worked");
162+
}
163+
164+
} catch (Exception e) {
165+
LOG.severe("Test failed: " + e.getMessage());
166+
throw e;
167+
} finally {
168+
// Now close the client.
169+
// It should not throw `Memory was leaked by query. Memory leaked: (...)` error!
170+
LOG.info("Closing the client...");
171+
client.close();
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)