Skip to content

Commit 2e59f9d

Browse files
committed
fix: reserve memory for sorting indices during query execution
1 parent 1e19eae commit 2e59f9d

File tree

6 files changed

+134
-0
lines changed

6 files changed

+134
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,9 @@ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
4343
@Override
4444
public void reserveMemoryVirtually(
4545
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
46+
47+
@Override
48+
public long getReservedMemory() {
49+
return 0;
50+
}
4651
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.iotdb.commons.utils.TestOnly;
23+
2224
import org.apache.tsfile.utils.Pair;
2325

2426
public interface MemoryReservationManager {
@@ -72,4 +74,13 @@ public interface MemoryReservationManager {
7274
* @param bytesAlreadyReserved the amount of memory that has already been reserved
7375
*/
7476
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
77+
78+
/**
79+
* Get the total amount of memory currently reserved. This includes memory that has been reserved,
80+
* plus memory pending to be reserved, minus memory pending to be released.
81+
*
82+
* @return the total amount of memory in bytes that is currently reserved
83+
*/
84+
@TestOnly
85+
long getReservedMemory();
7586
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,9 @@ public void reserveMemoryVirtually(
114114
reservedBytesInTotal += bytesAlreadyReserved;
115115
reserveMemoryCumulatively(bytesToBeReserved);
116116
}
117+
118+
@Override
119+
public long getReservedMemory() {
120+
return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal;
121+
}
117122
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,9 @@ public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}
64+
65+
@Override
66+
public synchronized long getReservedMemory() {
67+
return super.getReservedMemory();
68+
}
6469
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.utils.TestOnly;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2425
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
26+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2527
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2628
import org.apache.iotdb.db.service.metrics.WritingMetrics;
2729
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -325,6 +327,13 @@ protected void set(int index, long timestamp, int valueIndex) {
325327
int offset = i * ARRAY_SIZE;
326328
Arrays.setAll(indices.get(i), j -> offset + j);
327329
}
330+
// Reserve memory for indices if the TVList is owned by a query
331+
if (ownerQuery != null) {
332+
long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L;
333+
MemoryReservationManager memoryReservationManager =
334+
((FragmentInstanceContext) ownerQuery).getMemoryReservationContext();
335+
memoryReservationManager.reserveMemoryCumulatively(indicesBytes);
336+
}
328337
}
329338
indices.get(arrayIndex)[elementIndex] = valueIndex;
330339
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.exception.IllegalPathException;
24+
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.path.NonAlignedFullPath;
26+
import org.apache.iotdb.commons.path.PartialPath;
2327
import org.apache.iotdb.db.conf.IoTDBDescriptor;
28+
import org.apache.iotdb.db.exception.query.QueryProcessException;
2429
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2530
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2631
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,27 @@
3035
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3136
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3237
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
38+
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
39+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
40+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
41+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
42+
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
43+
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
3344
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
3445
import org.apache.iotdb.db.utils.datastructure.TVList;
3546

3647
import com.google.common.collect.ImmutableMap;
3748
import org.apache.tsfile.enums.TSDataType;
49+
import org.apache.tsfile.file.metadata.IDeviceID;
50+
import org.apache.tsfile.file.metadata.enums.CompressionType;
51+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
52+
import org.apache.tsfile.read.reader.IPointReader;
53+
import org.apache.tsfile.write.schema.MeasurementSchema;
3854
import org.junit.Test;
3955
import org.mockito.Mockito;
4056

4157
import java.io.ByteArrayOutputStream;
58+
import java.io.IOException;
4259
import java.io.PrintStream;
4360
import java.util.ArrayList;
4461
import java.util.Collections;
@@ -49,6 +66,7 @@
4966
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
5067
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
5168
import static org.junit.Assert.assertEquals;
69+
import static org.junit.Assert.assertFalse;
5270
import static org.junit.Assert.assertTrue;
5371
import static org.junit.Assert.fail;
5472

@@ -157,6 +175,71 @@ public void testTVListOwnerTransfer() throws InterruptedException {
157175
}
158176
}
159177

178+
@Test
179+
public void testTVListCloneForQuery() {
180+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
181+
182+
ExecutorService instanceNotificationExecutor =
183+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
184+
185+
try {
186+
String deviceId = "d1";
187+
String measurementId = "s1";
188+
IMemTable memTable = createMemTable(deviceId, measurementId);
189+
assertEquals(1, memTable.getMemTableMap().size());
190+
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
191+
assertEquals(1, memChunkGroup.getMemChunkMap().size());
192+
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
193+
TVList tvList = memChunk.getWorkingTVList();
194+
assertFalse(tvList.isSorted());
195+
196+
// FragmentInstance Context
197+
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
198+
FragmentInstanceStateMachine stateMachine1 =
199+
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
200+
FragmentInstanceContext fragmentInstanceContext1 =
201+
createFragmentInstanceContext(id1, stateMachine1);
202+
203+
FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
204+
FragmentInstanceStateMachine stateMachine2 =
205+
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
206+
FragmentInstanceContext fragmentInstanceContext2 =
207+
createFragmentInstanceContext(id2, stateMachine2);
208+
209+
// query on memtable
210+
NonAlignedFullPath fullPath =
211+
new NonAlignedFullPath(
212+
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
213+
new MeasurementSchema(
214+
measurementId,
215+
TSDataType.INT32,
216+
TSEncoding.RLE,
217+
CompressionType.UNCOMPRESSED,
218+
Collections.emptyMap()));
219+
ReadOnlyMemChunk readOnlyMemChunk1 =
220+
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
221+
ReadOnlyMemChunk readOnlyMemChunk2 =
222+
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
223+
224+
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
225+
while (pointReader.hasNextTimeValuePair()) {
226+
pointReader.nextTimeValuePair();
227+
}
228+
assertTrue(tvList.isSorted());
229+
assertEquals(
230+
tvList.calculateRamSize(),
231+
fragmentInstanceContext1.getMemoryReservationContext().getReservedMemory());
232+
} catch (QueryProcessException
233+
| IOException
234+
| MetadataException
235+
| MemoryNotEnoughException
236+
| IllegalArgumentException e) {
237+
fail(e.getMessage());
238+
} finally {
239+
instanceNotificationExecutor.shutdown();
240+
}
241+
}
242+
160243
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
161244
throws CpuNotEnoughException {
162245
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +284,20 @@ private TVList buildTVList() {
201284
}
202285
return tvList;
203286
}
287+
288+
private IMemTable createMemTable(String deviceId, String measurementId)
289+
throws IllegalPathException {
290+
IMemTable memTable = new PrimitiveMemTable("root.test", "1");
291+
292+
int rows = 100;
293+
for (int i = 0; i < 100; i++) {
294+
memTable.write(
295+
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
296+
Collections.singletonList(
297+
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
298+
rows - i - 1,
299+
new Object[] {i + 10});
300+
}
301+
return memTable;
302+
}
204303
}

0 commit comments

Comments
 (0)