Skip to content

Commit b4a3e2e

Browse files
authored
fix: reserve memory for sorting indices during query execution (apache#16960)
1 parent 1e19eae commit b4a3e2e

File tree

8 files changed

+195
-10
lines changed

8 files changed

+195
-10
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,26 +897,33 @@ public void releaseResourceWhenAllDriversAreClosed() {
897897
*/
898898
private void releaseTVListOwnedByQuery() {
899899
for (TVList tvList : tvListSet) {
900+
long tvListRamSize = tvList.calculateRamSize();
900901
tvList.lockQueryList();
901902
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
902903
try {
903904
queryContextSet.remove(this);
904905
if (tvList.getOwnerQuery() == this) {
906+
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
907+
LOGGER.warn(
908+
"Release TVList owned by query: allocate size {}, release size {}",
909+
tvList.getReservedMemoryBytes(),
910+
tvListRamSize);
911+
}
905912
if (queryContextSet.isEmpty()) {
906913
if (LOGGER.isDebugEnabled()) {
907914
LOGGER.debug(
908915
"TVList {} is released by the query, FragmentInstance Id is {}",
909916
tvList,
910917
this.getId());
911918
}
912-
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
919+
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
913920
tvList.clear();
914921
} else {
915922
// Transfer memory to next query. It must be exception-safe as this method is called
916923
// during FragmentInstanceExecution cleanup. Any exception during this process could
917924
// prevent proper resource cleanup and cause memory leaks.
918925
Pair<Long, Long> releasedBytes =
919-
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
926+
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
920927
FragmentInstanceContext queryContext =
921928
(FragmentInstanceContext) queryContextSet.iterator().next();
922929
queryContext

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
153153
// mutable tvlist
154154
TVList list = memChunk.getWorkingTVList();
155155
TVList cloneList = null;
156+
long tvListRamSize = list.calculateRamSize();
156157
list.lockQueryList();
157158
try {
158159
if (copyTimeFilter != null
@@ -193,7 +194,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
193194
if (firstQuery instanceof FragmentInstanceContext) {
194195
MemoryReservationManager memoryReservationManager =
195196
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
196-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
197+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
198+
list.setReservedMemoryBytes(tvListRamSize);
197199
}
198200
list.setOwnerQuery(firstQuery);
199201

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ protected void maybeReleaseTvList(TVList tvList) {
101101
}
102102

103103
private void tryReleaseTvList(TVList tvList) {
104+
long tvListRamSize = tvList.calculateRamSize();
104105
tvList.lockQueryList();
105106
try {
106107
if (tvList.getQueryContextSet().isEmpty()) {
@@ -112,7 +113,8 @@ private void tryReleaseTvList(TVList tvList) {
112113
if (firstQuery instanceof FragmentInstanceContext) {
113114
MemoryReservationManager memoryReservationManager =
114115
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
115-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
116+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
117+
tvList.setReservedMemoryBytes(tvListRamSize);
116118
}
117119
// update current TVList owner to first query in the list
118120
tvList.setOwnerQuery(firstQuery);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

22+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2223
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2324
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2425
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -118,6 +119,21 @@ public void sortTvLists() {
118119
int queryRowCount = entry.getValue();
119120
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
120121
alignedTvList.sort();
122+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
123+
alignedTvList.lockQueryList();
124+
try {
125+
FragmentInstanceContext ownerQuery =
126+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
127+
if (ownerQuery != null) {
128+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
129+
if (deltaBytes > 0) {
130+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
131+
alignedTvList.addReservedMemoryBytes(deltaBytes);
132+
}
133+
}
134+
} finally {
135+
alignedTvList.unlockQueryList();
136+
}
121137
}
122138
}
123139
}
@@ -356,10 +372,25 @@ public boolean isEmpty() {
356372
@Override
357373
public IPointReader getPointReader() {
358374
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
359-
AlignedTVList tvList = (AlignedTVList) entry.getKey();
375+
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
360376
int queryLength = entry.getValue();
361-
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
362-
tvList.sort();
377+
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
378+
alignedTvList.sort();
379+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
380+
alignedTvList.lockQueryList();
381+
try {
382+
FragmentInstanceContext ownerQuery =
383+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
384+
if (ownerQuery != null) {
385+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
386+
if (deltaBytes > 0) {
387+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
388+
alignedTvList.addReservedMemoryBytes(deltaBytes);
389+
}
390+
}
391+
} finally {
392+
alignedTvList.unlockQueryList();
393+
}
363394
}
364395
}
365396
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
2323
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2425
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2526
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2627
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,20 @@ public void sortTvLists() {
135136
int queryRowCount = entry.getValue();
136137
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
137138
tvList.sort();
139+
long tvListRamSize = tvList.calculateRamSize();
140+
tvList.lockQueryList();
141+
try {
142+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
143+
if (ownerQuery != null) {
144+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
145+
if (deltaBytes > 0) {
146+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
147+
tvList.addReservedMemoryBytes(deltaBytes);
148+
}
149+
}
150+
} finally {
151+
tvList.unlockQueryList();
152+
}
138153
}
139154
}
140155
}
@@ -274,6 +289,20 @@ public IPointReader getPointReader() {
274289
int queryLength = entry.getValue();
275290
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
276291
tvList.sort();
292+
long tvListRamSize = tvList.calculateRamSize();
293+
tvList.lockQueryList();
294+
try {
295+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
296+
if (ownerQuery != null) {
297+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
298+
if (deltaBytes > 0) {
299+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
300+
tvList.addReservedMemoryBytes(deltaBytes);
301+
}
302+
}
303+
} finally {
304+
tvList.unlockQueryList();
305+
}
277306
}
278307
}
279308
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ public TSDataType getDataType() {
961961
}
962962

963963
@Override
964-
public long calculateRamSize() {
964+
public synchronized long calculateRamSize() {
965965
return timestamps.size() * alignedTvListArrayMemCost();
966966
}
967967

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,20 @@ public abstract class TVList implements WALEntryValue {
7777
// Index relation: arrayIndex -> elementIndex
7878
protected List<BitMap> bitMap;
7979

80-
// lock to provide synchronization for query list
80+
// Guards queryContextSet, ownerQuery, and reservedMemoryBytes.
81+
// Always acquire this lock before accessing/modifying these fields.
8182
private final ReentrantLock queryListLock = new ReentrantLock();
83+
8284
// set of query that this TVList is used
8385
protected final Set<QueryContext> queryContextSet;
8486

8587
// the owner query which is obligated to release the TVList.
8688
// When it is null, the TVList is owned by insert thread and released after flush.
8789
protected QueryContext ownerQuery;
8890

91+
// Reserved memory by the query. Ensure to acquire queryListLock before update.
92+
protected long reservedMemoryBytes = 0L;
93+
8994
protected boolean sorted = true;
9095
protected long maxTime;
9196
protected long minTime;
@@ -157,14 +162,26 @@ public static long tvListArrayMemCost(TSDataType type) {
157162
return size;
158163
}
159164

160-
public long calculateRamSize() {
165+
public synchronized long calculateRamSize() {
161166
return timestamps.size() * tvListArrayMemCost();
162167
}
163168

164169
public synchronized boolean isSorted() {
165170
return sorted;
166171
}
167172

173+
public void setReservedMemoryBytes(long bytes) {
174+
this.reservedMemoryBytes = bytes;
175+
}
176+
177+
public void addReservedMemoryBytes(long bytes) {
178+
this.reservedMemoryBytes += bytes;
179+
}
180+
181+
public long getReservedMemoryBytes() {
182+
return reservedMemoryBytes;
183+
}
184+
168185
public abstract void sort();
169186

170187
public void increaseReferenceCount() {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.exception.IllegalPathException;
24+
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.path.NonAlignedFullPath;
26+
import org.apache.iotdb.commons.path.PartialPath;
2327
import org.apache.iotdb.db.conf.IoTDBDescriptor;
28+
import org.apache.iotdb.db.exception.query.QueryProcessException;
2429
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2530
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2631
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,27 @@
3035
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3136
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3237
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
38+
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
39+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
40+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
41+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
42+
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
43+
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
3344
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
3445
import org.apache.iotdb.db.utils.datastructure.TVList;
3546

3647
import com.google.common.collect.ImmutableMap;
3748
import org.apache.tsfile.enums.TSDataType;
49+
import org.apache.tsfile.file.metadata.IDeviceID;
50+
import org.apache.tsfile.file.metadata.enums.CompressionType;
51+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
52+
import org.apache.tsfile.read.reader.IPointReader;
53+
import org.apache.tsfile.write.schema.MeasurementSchema;
3854
import org.junit.Test;
3955
import org.mockito.Mockito;
4056

4157
import java.io.ByteArrayOutputStream;
58+
import java.io.IOException;
4259
import java.io.PrintStream;
4360
import java.util.ArrayList;
4461
import java.util.Collections;
@@ -49,6 +66,7 @@
4966
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
5067
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
5168
import static org.junit.Assert.assertEquals;
69+
import static org.junit.Assert.assertFalse;
5270
import static org.junit.Assert.assertTrue;
5371
import static org.junit.Assert.fail;
5472

@@ -157,6 +175,69 @@ public void testTVListOwnerTransfer() throws InterruptedException {
157175
}
158176
}
159177

178+
@Test
179+
public void testTVListCloneForQuery() {
180+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
181+
182+
ExecutorService instanceNotificationExecutor =
183+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
184+
185+
try {
186+
String deviceId = "d1";
187+
String measurementId = "s1";
188+
IMemTable memTable = createMemTable(deviceId, measurementId);
189+
assertEquals(1, memTable.getMemTableMap().size());
190+
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
191+
assertEquals(1, memChunkGroup.getMemChunkMap().size());
192+
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
193+
TVList tvList = memChunk.getWorkingTVList();
194+
assertFalse(tvList.isSorted());
195+
196+
// FragmentInstance Context
197+
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
198+
FragmentInstanceStateMachine stateMachine1 =
199+
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
200+
FragmentInstanceContext fragmentInstanceContext1 =
201+
createFragmentInstanceContext(id1, stateMachine1);
202+
203+
FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
204+
FragmentInstanceStateMachine stateMachine2 =
205+
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
206+
FragmentInstanceContext fragmentInstanceContext2 =
207+
createFragmentInstanceContext(id2, stateMachine2);
208+
209+
// query on memtable
210+
NonAlignedFullPath fullPath =
211+
new NonAlignedFullPath(
212+
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
213+
new MeasurementSchema(
214+
measurementId,
215+
TSDataType.INT32,
216+
TSEncoding.RLE,
217+
CompressionType.UNCOMPRESSED,
218+
Collections.emptyMap()));
219+
ReadOnlyMemChunk readOnlyMemChunk1 =
220+
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
221+
ReadOnlyMemChunk readOnlyMemChunk2 =
222+
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
223+
224+
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
225+
while (pointReader.hasNextTimeValuePair()) {
226+
pointReader.nextTimeValuePair();
227+
}
228+
assertTrue(tvList.isSorted());
229+
assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes());
230+
} catch (QueryProcessException
231+
| IOException
232+
| MetadataException
233+
| MemoryNotEnoughException
234+
| IllegalArgumentException e) {
235+
fail(e.getMessage());
236+
} finally {
237+
instanceNotificationExecutor.shutdown();
238+
}
239+
}
240+
160241
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
161242
throws CpuNotEnoughException {
162243
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +282,20 @@ private TVList buildTVList() {
201282
}
202283
return tvList;
203284
}
285+
286+
private IMemTable createMemTable(String deviceId, String measurementId)
287+
throws IllegalPathException {
288+
IMemTable memTable = new PrimitiveMemTable("root.test", "1");
289+
290+
int rows = 100;
291+
for (int i = 0; i < 100; i++) {
292+
memTable.write(
293+
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
294+
Collections.singletonList(
295+
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
296+
rows - i - 1,
297+
new Object[] {i + 10});
298+
}
299+
return memTable;
300+
}
204301
}

0 commit comments

Comments
 (0)