Skip to content

Commit c0b92d8

Browse files
committed
fix: reserve memory for sorting indices during query execution
1 parent b30fd34 commit c0b92d8

File tree

6 files changed

+135
-0
lines changed

6 files changed

+135
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,9 @@ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
4343
@Override
4444
public void reserveMemoryVirtually(
4545
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
46+
47+
@Override
48+
public long getReservedMemory() {
49+
return 0;
50+
}
4651
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.iotdb.commons.utils.TestOnly;
23+
2224
import org.apache.tsfile.utils.Pair;
2325

2426
public interface MemoryReservationManager {
@@ -72,4 +74,13 @@ public interface MemoryReservationManager {
7274
* @param bytesAlreadyReserved the amount of memory that has already been reserved
7375
*/
7476
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
77+
78+
/**
79+
* Get the total amount of memory currently reserved. This includes memory that has been reserved,
80+
* plus memory pending to be reserved, minus memory pending to be released.
81+
*
82+
* @return the total amount of memory in bytes that is currently reserved
83+
*/
84+
@TestOnly
85+
long getReservedMemory();
7586
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,9 @@ public void reserveMemoryVirtually(
114114
reservedBytesInTotal += bytesAlreadyReserved;
115115
reserveMemoryCumulatively(bytesToBeReserved);
116116
}
117+
118+
@Override
119+
public long getReservedMemory() {
120+
return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal;
121+
}
117122
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,9 @@ public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}
64+
65+
@Override
66+
public synchronized long getReservedMemory() {
67+
return super.getReservedMemory();
68+
}
6469
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.apache.iotdb.db.utils.datastructure;
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
23+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2324
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
25+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2426
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2527
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
2628
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -319,6 +321,13 @@ protected void set(int index, long timestamp, int valueIndex) {
319321
int offset = i * ARRAY_SIZE;
320322
Arrays.setAll(indices.get(i), j -> offset + j);
321323
}
324+
// Reserve memory for indices if the TVList is owned by a query
325+
if (ownerQuery != null) {
326+
long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L;
327+
MemoryReservationManager memoryReservationManager =
328+
((FragmentInstanceContext) ownerQuery).getMemoryReservationContext();
329+
memoryReservationManager.reserveMemoryCumulatively(indicesBytes);
330+
}
322331
}
323332
indices.get(arrayIndex)[elementIndex] = valueIndex;
324333
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.exception.IllegalPathException;
24+
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.path.MeasurementPath;
26+
import org.apache.iotdb.commons.path.PartialPath;
2327
import org.apache.iotdb.db.conf.IoTDBDescriptor;
28+
import org.apache.iotdb.db.exception.query.QueryProcessException;
2429
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2530
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2631
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,26 @@
3035
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3136
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3237
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
38+
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
39+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
40+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
41+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
42+
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
43+
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
3344
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
3445
import org.apache.iotdb.db.utils.datastructure.TVList;
3546

3647
import com.google.common.collect.ImmutableMap;
3748
import org.apache.tsfile.enums.TSDataType;
49+
import org.apache.tsfile.file.metadata.enums.CompressionType;
50+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
51+
import org.apache.tsfile.read.reader.IPointReader;
52+
import org.apache.tsfile.write.schema.MeasurementSchema;
3853
import org.junit.Test;
3954
import org.mockito.Mockito;
4055

4156
import java.io.ByteArrayOutputStream;
57+
import java.io.IOException;
4258
import java.io.PrintStream;
4359
import java.util.ArrayList;
4460
import java.util.Collections;
@@ -49,6 +65,7 @@
4965
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
5066
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
5167
import static org.junit.Assert.assertEquals;
68+
import static org.junit.Assert.assertFalse;
5269
import static org.junit.Assert.assertTrue;
5370
import static org.junit.Assert.fail;
5471

@@ -157,6 +174,73 @@ public void testTVListOwnerTransfer() throws InterruptedException {
157174
}
158175
}
159176

177+
@Test
178+
public void testTVListCloneForQuery() {
179+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
180+
181+
ExecutorService instanceNotificationExecutor =
182+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
183+
184+
try {
185+
String deviceId = "d1";
186+
String measurementId = "s1";
187+
IMemTable memTable = createMemTable(deviceId, measurementId);
188+
assertEquals(1, memTable.getMemTableMap().size());
189+
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
190+
assertEquals(1, memChunkGroup.getMemChunkMap().size());
191+
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
192+
TVList tvList = memChunk.getWorkingTVList();
193+
assertFalse(tvList.isSorted());
194+
long unsortedTvListRamSize = tvList.calculateRamSize();
195+
196+
// FragmentInstance Context
197+
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
198+
FragmentInstanceStateMachine stateMachine1 =
199+
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
200+
FragmentInstanceContext fragmentInstanceContext1 =
201+
createFragmentInstanceContext(id1, stateMachine1);
202+
203+
FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
204+
FragmentInstanceStateMachine stateMachine2 =
205+
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
206+
FragmentInstanceContext fragmentInstanceContext2 =
207+
createFragmentInstanceContext(id2, stateMachine2);
208+
209+
// query on memtable
210+
MeasurementPath fullPath =
211+
new MeasurementPath(
212+
deviceId,
213+
measurementId,
214+
new MeasurementSchema(
215+
measurementId,
216+
TSDataType.INT32,
217+
TSEncoding.RLE,
218+
CompressionType.UNCOMPRESSED,
219+
Collections.emptyMap()));
220+
ReadOnlyMemChunk readOnlyMemChunk1 =
221+
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
222+
ReadOnlyMemChunk readOnlyMemChunk2 =
223+
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
224+
225+
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
226+
while (pointReader.hasNextTimeValuePair()) {
227+
pointReader.nextTimeValuePair();
228+
}
229+
assertTrue(tvList.isSorted());
230+
assertEquals(
231+
tvList.calculateRamSize(),
232+
fragmentInstanceContext1.getMemoryReservationContext().getReservedMemory());
233+
} catch (QueryProcessException
234+
| IOException
235+
| MetadataException
236+
| MemoryNotEnoughException
237+
| IllegalArgumentException e) {
238+
fail(e.getMessage());
239+
} finally {
240+
instanceNotificationExecutor.shutdown();
241+
}
242+
}
243+
160244
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
161245
throws CpuNotEnoughException {
162246
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +285,20 @@ private TVList buildTVList() {
201285
}
202286
return tvList;
203287
}
288+
289+
private IMemTable createMemTable(String deviceId, String measurementId)
290+
throws IllegalPathException {
291+
IMemTable memTable = new PrimitiveMemTable("root.test", "1");
292+
293+
int rows = 100;
294+
for (int i = 0; i < 100; i++) {
295+
memTable.write(
296+
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
297+
Collections.singletonList(
298+
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
299+
rows - i - 1,
300+
new Object[] {i + 10});
301+
}
302+
return memTable;
303+
}
204304
}

0 commit comments

Comments
 (0)