Skip to content

Commit 1ac4165

Browse files
committed
fix: memory leak in query stream close
1 parent ddb8948 commit 1ac4165

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client.issues;
23+
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.logging.Logger;
28+
import java.util.stream.Stream;
29+
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
33+
34+
import com.influxdb.v3.client.InfluxDBClient;
35+
import com.influxdb.v3.client.PointValues;
36+
import com.influxdb.v3.client.config.ClientConfig;
37+
38+
public class MemoryLeakIssueTest {
39+
40+
private static final Logger LOG = Logger.getLogger(MemoryLeakIssueTest.class.getName());
41+
42+
/**
43+
* Tests that interrupting a thread during stream consumption does not cause Arrow memory leaks.
44+
* <p>
45+
* This test creates a query thread that slowly consumes results, then interrupts it mid-processing.
46+
* The interrupt causes FlightStream.close() to throw InterruptedException, which previously bypassed
47+
* cleanup code and left Apache Arrow buffers unreleased. With the fix, client.close() should complete
48+
* successfully without throwing "Memory was leaked" errors.
49+
*/
50+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
51+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
52+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
53+
@Test
54+
void testStreamCloseWithThreadInterrupt() throws Exception {
55+
String host = System.getenv("TESTING_INFLUXDB_URL");
56+
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
57+
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
58+
String measurement = "memory_leak_test_" + System.currentTimeMillis();
59+
60+
61+
// Prepare config
62+
ClientConfig config = new ClientConfig.Builder()
63+
.host(host)
64+
.token(token.toCharArray())
65+
.database(database)
66+
.writeNoSync(true)
67+
.build();
68+
69+
// Write test data
70+
try (InfluxDBClient client = InfluxDBClient.getInstance(config)) {
71+
LOG.info("Writing test data...");
72+
for (int i = 0; i < 100; i++) {
73+
client.writeRecord(String.format("%s,id=%04d temp=%f",
74+
measurement, i, 20.0 + Math.random() * 10));
75+
}
76+
}
77+
78+
TimeUnit.MILLISECONDS.sleep(500);
79+
80+
// Query data
81+
InfluxDBClient client = InfluxDBClient.getInstance(config);
82+
//noinspection TryFinallyCanBeTryWithResources
83+
try {
84+
String sql = String.format("SELECT * FROM %s", measurement);
85+
86+
// Synchronization to ensure we interrupt during consumption
87+
CountDownLatch consumingStarted = new CountDownLatch(1);
88+
AtomicInteger rowsProcessed = new AtomicInteger(0);
89+
AtomicInteger exceptionsThrown = new AtomicInteger(0);
90+
91+
Thread queryThread = new Thread(() -> {
92+
try (Stream<PointValues> stream = client.queryPoints(sql)) {
93+
LOG.info("queryPoints returned");
94+
stream.forEach(pv -> {
95+
int count = rowsProcessed.incrementAndGet();
96+
97+
// Signal that we've started consuming
98+
if (count == 1) {
99+
LOG.info("Started consuming - ready for interrupt");
100+
consumingStarted.countDown();
101+
}
102+
103+
try {
104+
// Slow consumption to ensure we're mid-stream when interrupted
105+
TimeUnit.MILLISECONDS.sleep(100);
106+
} catch (InterruptedException e) {
107+
LOG.info("INTERRUPTED during consume! (after " + count + " rows)");
108+
Thread.currentThread().interrupt();
109+
// Throw exception to stop stream consumption immediately; try-with-resources will then
110+
// close stream in interrupted state
111+
throw new RuntimeException("Interrupted", e);
112+
}
113+
});
114+
} catch (Exception e) {
115+
exceptionsThrown.incrementAndGet();
116+
LOG.info("Exception caught: " + e);
117+
}
118+
});
119+
120+
LOG.info("Starting consumer thread...");
121+
queryThread.start();
122+
123+
// Wait for thread to start consuming
124+
if (!consumingStarted.await(10, TimeUnit.SECONDS)) {
125+
Assertions.fail("Thread didn't start consuming in time!");
126+
}
127+
128+
// Give it a moment to be mid-processing
129+
TimeUnit.MILLISECONDS.sleep(50);
130+
131+
// Interrupt during processing
132+
LOG.info("Interrupting thread...");
133+
queryThread.interrupt();
134+
135+
// Wait for thread to finish
136+
queryThread.join(10000);
137+
138+
// Verify that thread started processing rows
139+
if (rowsProcessed.get() == 0) {
140+
Assertions.fail("Thread didn't process any rows");
141+
}
142+
143+
// Verify that exception was thrown due to interrupt
144+
if (exceptionsThrown.get() == 0) {
145+
Assertions.fail("No exception was thrown - interrupt might not have worked");
146+
}
147+
148+
} catch (Exception e) {
149+
LOG.severe("Test failed: " + e.getMessage());
150+
throw e;
151+
} finally {
152+
// Now close the client.
153+
// It should not throw `Memory was leaked by query. Memory leaked: (...)` error!
154+
LOG.info("Closing the client...");
155+
client.close();
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)