Skip to content

Commit dcb6235

Browse files
committed
fix: reserve memory for sorting indices during query execution
1 parent b30fd34 commit dcb6235

File tree

6 files changed

+134
-0
lines changed

6 files changed

+134
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,9 @@ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
4343
@Override
4444
public void reserveMemoryVirtually(
4545
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
46+
47+
@Override
48+
public long getReservedMemory() {
49+
return 0;
50+
}
4651
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.iotdb.commons.utils.TestOnly;
23+
2224
import org.apache.tsfile.utils.Pair;
2325

2426
public interface MemoryReservationManager {
@@ -72,4 +74,13 @@ public interface MemoryReservationManager {
7274
* @param bytesAlreadyReserved the amount of memory that has already been reserved
7375
*/
7476
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
77+
78+
/**
79+
* Get the total amount of memory currently reserved. This includes memory that has been reserved,
80+
* plus memory pending to be reserved, minus memory pending to be released.
81+
*
82+
* @return the total amount of memory in bytes that is currently reserved
83+
*/
84+
@TestOnly
85+
long getReservedMemory();
7586
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,9 @@ public void reserveMemoryVirtually(
114114
reservedBytesInTotal += bytesAlreadyReserved;
115115
reserveMemoryCumulatively(bytesToBeReserved);
116116
}
117+
118+
@Override
119+
public long getReservedMemory() {
120+
return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal;
121+
}
117122
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,9 @@ public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}
64+
65+
@Override
66+
public synchronized long getReservedMemory() {
67+
return super.getReservedMemory();
68+
}
6469
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.apache.iotdb.db.utils.datastructure;
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
23+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2324
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
25+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2426
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2527
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
2628
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -319,6 +321,13 @@ protected void set(int index, long timestamp, int valueIndex) {
319321
int offset = i * ARRAY_SIZE;
320322
Arrays.setAll(indices.get(i), j -> offset + j);
321323
}
324+
// Reserve memory for indices if the TVList is owned by a query
325+
if (ownerQuery != null) {
326+
long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L;
327+
MemoryReservationManager memoryReservationManager =
328+
((FragmentInstanceContext) ownerQuery).getMemoryReservationContext();
329+
memoryReservationManager.reserveMemoryCumulatively(indicesBytes);
330+
}
322331
}
323332
indices.get(arrayIndex)[elementIndex] = valueIndex;
324333
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.exception.IllegalPathException;
24+
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.path.MeasurementPath;
26+
import org.apache.iotdb.commons.path.PartialPath;
2327
import org.apache.iotdb.db.conf.IoTDBDescriptor;
28+
import org.apache.iotdb.db.exception.query.QueryProcessException;
2429
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2530
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2631
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,26 @@
3035
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3136
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3237
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
38+
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
39+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
40+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
41+
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
42+
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
43+
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
3344
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
3445
import org.apache.iotdb.db.utils.datastructure.TVList;
3546

3647
import com.google.common.collect.ImmutableMap;
3748
import org.apache.tsfile.enums.TSDataType;
49+
import org.apache.tsfile.file.metadata.enums.CompressionType;
50+
import org.apache.tsfile.file.metadata.enums.TSEncoding;
51+
import org.apache.tsfile.read.reader.IPointReader;
52+
import org.apache.tsfile.write.schema.MeasurementSchema;
3853
import org.junit.Test;
3954
import org.mockito.Mockito;
4055

4156
import java.io.ByteArrayOutputStream;
57+
import java.io.IOException;
4258
import java.io.PrintStream;
4359
import java.util.ArrayList;
4460
import java.util.Collections;
@@ -49,6 +65,7 @@
4965
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
5066
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
5167
import static org.junit.Assert.assertEquals;
68+
import static org.junit.Assert.assertFalse;
5269
import static org.junit.Assert.assertTrue;
5370
import static org.junit.Assert.fail;
5471

@@ -157,6 +174,72 @@ public void testTVListOwnerTransfer() throws InterruptedException {
157174
}
158175
}
159176

177+
@Test
178+
public void testTVListCloneForQuery() {
179+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
180+
181+
ExecutorService instanceNotificationExecutor =
182+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
183+
184+
try {
185+
String deviceId = "d1";
186+
String measurementId = "s1";
187+
IMemTable memTable = createMemTable(deviceId, measurementId);
188+
assertEquals(1, memTable.getMemTableMap().size());
189+
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
190+
assertEquals(1, memChunkGroup.getMemChunkMap().size());
191+
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
192+
TVList tvList = memChunk.getWorkingTVList();
193+
assertFalse(tvList.isSorted());
194+
195+
// FragmentInstance Context
196+
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
197+
FragmentInstanceStateMachine stateMachine1 =
198+
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
199+
FragmentInstanceContext fragmentInstanceContext1 =
200+
createFragmentInstanceContext(id1, stateMachine1);
201+
202+
FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
203+
FragmentInstanceStateMachine stateMachine2 =
204+
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
205+
FragmentInstanceContext fragmentInstanceContext2 =
206+
createFragmentInstanceContext(id2, stateMachine2);
207+
208+
// query on memtable
209+
MeasurementPath fullPath =
210+
new MeasurementPath(
211+
deviceId,
212+
measurementId,
213+
new MeasurementSchema(
214+
measurementId,
215+
TSDataType.INT32,
216+
TSEncoding.RLE,
217+
CompressionType.UNCOMPRESSED,
218+
Collections.emptyMap()));
219+
ReadOnlyMemChunk readOnlyMemChunk1 =
220+
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
221+
ReadOnlyMemChunk readOnlyMemChunk2 =
222+
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
223+
224+
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
225+
while (pointReader.hasNextTimeValuePair()) {
226+
pointReader.nextTimeValuePair();
227+
}
228+
assertTrue(tvList.isSorted());
229+
assertEquals(
230+
tvList.calculateRamSize(),
231+
fragmentInstanceContext1.getMemoryReservationContext().getReservedMemory());
232+
} catch (QueryProcessException
233+
| IOException
234+
| MetadataException
235+
| MemoryNotEnoughException
236+
| IllegalArgumentException e) {
237+
fail(e.getMessage());
238+
} finally {
239+
instanceNotificationExecutor.shutdown();
240+
}
241+
}
242+
160243
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
161244
throws CpuNotEnoughException {
162245
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +284,20 @@ private TVList buildTVList() {
201284
}
202285
return tvList;
203286
}
287+
288+
private IMemTable createMemTable(String deviceId, String measurementId)
289+
throws IllegalPathException {
290+
IMemTable memTable = new PrimitiveMemTable("root.test", "1");
291+
292+
int rows = 100;
293+
for (int i = 0; i < 100; i++) {
294+
memTable.write(
295+
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
296+
Collections.singletonList(
297+
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
298+
rows - i - 1,
299+
new Object[] {i + 10});
300+
}
301+
return memTable;
302+
}
204303
}

0 commit comments

Comments
 (0)