Skip to content

Commit 817ced0

Browse files
authored
Merge branch 'master' into fixMemTableQueryBug
2 parents a898a1c + 9d9902f commit 817ced0

File tree

14 files changed

+279
-23
lines changed

14 files changed

+279
-23
lines changed

iotdb-core/ainode/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ scikit-learn = "^1.7.1"
9595
statsmodels = "^0.14.5"
9696
sktime = "0.40.1"
9797
pmdarima = "2.1.1"
98-
hmmlearn = "0.3.2"
98+
hmmlearn = "0.3.3"
9999
accelerate = "^1.10.1"
100100

101101
# ---- Optimizers / utils ----

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,26 +897,33 @@ public void releaseResourceWhenAllDriversAreClosed() {
897897
*/
898898
private void releaseTVListOwnedByQuery() {
899899
for (TVList tvList : tvListSet) {
900+
long tvListRamSize = tvList.calculateRamSize();
900901
tvList.lockQueryList();
901902
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
902903
try {
903904
queryContextSet.remove(this);
904905
if (tvList.getOwnerQuery() == this) {
906+
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
907+
LOGGER.warn(
908+
"Release TVList owned by query: allocate size {}, release size {}",
909+
tvList.getReservedMemoryBytes(),
910+
tvListRamSize);
911+
}
905912
if (queryContextSet.isEmpty()) {
906913
if (LOGGER.isDebugEnabled()) {
907914
LOGGER.debug(
908915
"TVList {} is released by the query, FragmentInstance Id is {}",
909916
tvList,
910917
this.getId());
911918
}
912-
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
919+
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
913920
tvList.clear();
914921
} else {
915922
// Transfer memory to next query. It must be exception-safe as this method is called
916923
// during FragmentInstanceExecution cleanup. Any exception during this process could
917924
// prevent proper resource cleanup and cause memory leaks.
918925
Pair<Long, Long> releasedBytes =
919-
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
926+
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
920927
FragmentInstanceContext queryContext =
921928
(FragmentInstanceContext) queryContextSet.iterator().next();
922929
queryContext

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,13 +1086,14 @@ private void addSortForEachLastQueryNode(PlanNode root, Ordering timeseriesOrder
10861086
if (child instanceof LastQueryScanNode) {
10871087
// sort the measurements for LastQueryMergeOperator
10881088
LastQueryScanNode node = (LastQueryScanNode) child;
1089-
((LastQueryScanNode) child)
1090-
.getIdxOfMeasurementSchemas()
1089+
node.getIdxOfMeasurementSchemas()
10911090
.sort(
10921091
Comparator.comparing(
10931092
idx ->
10941093
new Binary(
1095-
node.getMeasurementSchema(idx).getMeasurementName(),
1094+
node.getGlobalMeasurementSchemaList()
1095+
.get(idx)
1096+
.getMeasurementName(),
10961097
TSFileConfig.STRING_CHARSET),
10971098
Comparator.naturalOrder()));
10981099
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ public synchronized void releaseAllReservedMemory() {
5252
}
5353

5454
@Override
55-
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
55+
public synchronized Pair<Long, Long> releaseMemoryVirtually(final long size) {
5656
return super.releaseMemoryVirtually(size);
5757
}
5858

5959
@Override
60-
public void reserveMemoryVirtually(
60+
public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,10 @@ public void setGlobalMeasurementSchemaList(List<IMeasurementSchema> globalMeasur
346346
this.globalMeasurementSchemaList = globalMeasurementSchemaList;
347347
}
348348

349+
public List<IMeasurementSchema> getGlobalMeasurementSchemaList() {
350+
return globalMeasurementSchemaList;
351+
}
352+
349353
public IMeasurementSchema getMeasurementSchema(int idx) {
350354
int globalIdx = indexOfMeasurementSchemas.get(idx);
351355
return globalMeasurementSchemaList.get(globalIdx);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
153153
// mutable tvlist
154154
TVList list = memChunk.getWorkingTVList();
155155
TVList cloneList = null;
156+
long tvListRamSize = list.calculateRamSize();
156157
list.lockQueryList();
157158
try {
158159
if (copyTimeFilter != null
@@ -193,7 +194,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
193194
if (firstQuery instanceof FragmentInstanceContext) {
194195
MemoryReservationManager memoryReservationManager =
195196
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
196-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
197+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
198+
list.setReservedMemoryBytes(tvListRamSize);
197199
}
198200
list.setOwnerQuery(firstQuery);
199201

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3600,9 +3600,21 @@ public void writeObject(ObjectNode objectNode) throws Exception {
36003600
writeLock("writeObject");
36013601
try {
36023602
String relativeTmpPathString = objectNode.getFilePathString() + ".tmp";
3603-
String objectFileDir = TierManager.getInstance().getNextFolderForObjectFile();
3604-
File objectTmpFile =
3605-
FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString);
3603+
String objectFileDir = null;
3604+
File objectTmpFile = null;
3605+
for (String objectDir : TierManager.getInstance().getAllObjectFileFolders()) {
3606+
File tmpFile = FSFactoryProducer.getFSFactory().getFile(objectDir, relativeTmpPathString);
3607+
if (tmpFile.exists()) {
3608+
objectFileDir = objectDir;
3609+
objectTmpFile = tmpFile;
3610+
break;
3611+
}
3612+
}
3613+
if (objectTmpFile == null) {
3614+
objectFileDir = TierManager.getInstance().getNextFolderForObjectFile();
3615+
objectTmpFile =
3616+
FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString);
3617+
}
36063618
try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
36073619
writer.write(
36083620
objectNode.isGeneratedByRemoteConsensusLeader(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ protected void maybeReleaseTvList(TVList tvList) {
101101
}
102102

103103
private void tryReleaseTvList(TVList tvList) {
104+
long tvListRamSize = tvList.calculateRamSize();
104105
tvList.lockQueryList();
105106
try {
106107
if (tvList.getQueryContextSet().isEmpty()) {
@@ -112,7 +113,8 @@ private void tryReleaseTvList(TVList tvList) {
112113
if (firstQuery instanceof FragmentInstanceContext) {
113114
MemoryReservationManager memoryReservationManager =
114115
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
115-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
116+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
117+
tvList.setReservedMemoryBytes(tvListRamSize);
116118
}
117119
// update current TVList owner to first query in the list
118120
tvList.setOwnerQuery(firstQuery);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

22+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2223
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2324
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2425
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -118,6 +119,21 @@ public void sortTvLists() {
118119
int queryRowCount = entry.getValue();
119120
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
120121
alignedTvList.sort();
122+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
123+
alignedTvList.lockQueryList();
124+
try {
125+
FragmentInstanceContext ownerQuery =
126+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
127+
if (ownerQuery != null) {
128+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
129+
if (deltaBytes > 0) {
130+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
131+
alignedTvList.addReservedMemoryBytes(deltaBytes);
132+
}
133+
}
134+
} finally {
135+
alignedTvList.unlockQueryList();
136+
}
121137
}
122138
}
123139
}
@@ -356,10 +372,25 @@ public boolean isEmpty() {
356372
@Override
357373
public IPointReader getPointReader() {
358374
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
359-
AlignedTVList tvList = (AlignedTVList) entry.getKey();
375+
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
360376
int queryLength = entry.getValue();
361-
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
362-
tvList.sort();
377+
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
378+
alignedTvList.sort();
379+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
380+
alignedTvList.lockQueryList();
381+
try {
382+
FragmentInstanceContext ownerQuery =
383+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
384+
if (ownerQuery != null) {
385+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
386+
if (deltaBytes > 0) {
387+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
388+
alignedTvList.addReservedMemoryBytes(deltaBytes);
389+
}
390+
}
391+
} finally {
392+
alignedTvList.unlockQueryList();
393+
}
363394
}
364395
}
365396
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
2323
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2425
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2526
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2627
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,20 @@ public void sortTvLists() {
135136
int queryRowCount = entry.getValue();
136137
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
137138
tvList.sort();
139+
long tvListRamSize = tvList.calculateRamSize();
140+
tvList.lockQueryList();
141+
try {
142+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
143+
if (ownerQuery != null) {
144+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
145+
if (deltaBytes > 0) {
146+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
147+
tvList.addReservedMemoryBytes(deltaBytes);
148+
}
149+
}
150+
} finally {
151+
tvList.unlockQueryList();
152+
}
138153
}
139154
}
140155
}
@@ -274,6 +289,20 @@ public IPointReader getPointReader() {
274289
int queryLength = entry.getValue();
275290
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
276291
tvList.sort();
292+
long tvListRamSize = tvList.calculateRamSize();
293+
tvList.lockQueryList();
294+
try {
295+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
296+
if (ownerQuery != null) {
297+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
298+
if (deltaBytes > 0) {
299+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
300+
tvList.addReservedMemoryBytes(deltaBytes);
301+
}
302+
}
303+
} finally {
304+
tvList.unlockQueryList();
305+
}
277306
}
278307
}
279308
TsBlock tsBlock = buildTsBlock();

0 commit comments

Comments
 (0)