Skip to content

Commit 5598d2e

Browse files
committed
memory for CteScanOperator / CteScanReader
1 parent 03d5c52 commit 5598d2e

File tree

7 files changed

+95
-82
lines changed

7 files changed

+95
-82
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
2323

24-
import org.apache.iotdb.commons.utils.TestOnly;
2524
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
2625
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
2726
import org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
27+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2828
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
2929
import org.apache.iotdb.db.utils.cte.CteDataReader;
3030
import org.apache.iotdb.db.utils.cte.CteDataStore;
@@ -36,6 +36,8 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

39+
import static java.util.Objects.requireNonNull;
40+
3941
public class CteScanOperator implements SourceOperator {
4042
private static final Logger LOGGER = LoggerFactory.getLogger(CteScanOperator.class);
4143
private static final long INSTANCE_SIZE =
@@ -46,42 +48,33 @@ public class CteScanOperator implements SourceOperator {
4648

4749
private final OperatorContext operatorContext;
4850
private final PlanNodeId sourceId;
49-
50-
private final CteDataStore dataStore;
5151
private final CteDataReader dataReader;
52-
private final int dataStoreRefCount;
5352

5453
public CteScanOperator(
55-
OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore dataStore) {
54+
OperatorContext operatorContext,
55+
PlanNodeId sourceId,
56+
CteDataStore dataStore,
57+
MemoryReservationManager memoryReservationManager) {
58+
requireNonNull(dataStore, "dataStore is null");
5659
this.operatorContext = operatorContext;
5760
this.sourceId = sourceId;
58-
this.dataStore = dataStore;
59-
this.dataReader = new MemoryReader(dataStore.getCachedData());
60-
this.dataStoreRefCount = dataStore.increaseRefCount();
61+
this.dataReader = new MemoryReader(dataStore, memoryReservationManager);
6162
}
6263

6364
@Override
6465
public TsBlock next() throws Exception {
65-
if (dataReader == null) {
66-
return null;
67-
}
6866
return dataReader.next();
6967
}
7068

7169
@Override
7270
public boolean hasNext() throws Exception {
73-
if (dataReader == null) {
74-
return false;
75-
}
7671
return dataReader.hasNext();
7772
}
7873

7974
@Override
8075
public void close() throws Exception {
8176
try {
82-
if (dataReader != null) {
83-
dataReader.close();
84-
}
77+
dataReader.close();
8578
} catch (Exception e) {
8679
LOGGER.error("Fail to close CteDataReader", e);
8780
}
@@ -109,15 +102,9 @@ public long calculateRetainedSizeAfterCallingNext() {
109102

110103
@Override
111104
public long ramBytesUsed() {
112-
long bytes =
113-
INSTANCE_SIZE
114-
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
115-
+ dataReader.bytesUsed();
116-
if (dataStoreRefCount == 1) {
117-
bytes += dataStore.getCachedBytes();
118-
}
119-
120-
return bytes;
105+
return INSTANCE_SIZE
106+
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
107+
+ dataReader.ramBytesUsed();
121108
}
122109

123110
@Override
@@ -129,9 +116,4 @@ public OperatorContext getOperatorContext() {
129116
public PlanNodeId getSourceId() {
130117
return sourceId;
131118
}
132-
133-
@TestOnly
134-
public int getDataStoreRefCount() {
135-
return dataStoreRefCount;
136-
}
137119
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1202,7 +1202,11 @@ public Operator visitCteScan(CteScanNode node, LocalExecutionPlanContext context
12021202
context.getNextOperatorId(),
12031203
node.getPlanNodeId(),
12041204
CteScanOperator.class.getSimpleName());
1205-
return new CteScanOperator(operatorContext, node.getPlanNodeId(), node.getDataStore());
1205+
return new CteScanOperator(
1206+
operatorContext,
1207+
node.getPlanNodeId(),
1208+
node.getDataStore(),
1209+
context.getInstanceContext().getMemoryReservationContext());
12061210
}
12071211

12081212
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CteMaterializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public CteDataStore fetchCteQueryResult(
177177
table,
178178
"!!! Failed to materialize CTE. The main query falls back to INLINE mode !!!");
179179
}
180-
context.releaseMemoryReservedForFrontEnd(cteDataStore.getCachedBytes());
180+
context.releaseMemoryReservedForFrontEnd(cteDataStore.ramBytesUsed());
181181
cteDataStore.clear();
182182
return null;
183183
}
@@ -192,7 +192,7 @@ public CteDataStore fetchCteQueryResult(
192192
return cteDataStore;
193193
} catch (final Throwable throwable) {
194194
if (cteDataStore != null) {
195-
context.releaseMemoryReservedForFrontEnd(cteDataStore.getCachedBytes());
195+
context.releaseMemoryReservedForFrontEnd(cteDataStore.ramBytesUsed());
196196
cteDataStore.clear();
197197
}
198198
t = throwable;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/CteDataReader.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import org.apache.iotdb.commons.exception.IoTDBException;
2525

2626
import org.apache.tsfile.read.common.block.TsBlock;
27+
import org.apache.tsfile.utils.Accountable;
2728

28-
public interface CteDataReader {
29+
public interface CteDataReader extends Accountable {
2930
/**
3031
* Check if there is more data in CteDataReader. DiskSpillerReader may run out of current TsBlocks
3132
* , then it needs to read from file and cache more data. This method should be called before
@@ -48,11 +49,4 @@ public interface CteDataReader {
4849
* @throws IoTDBException the error occurs when closing fileChannel
4950
*/
5051
void close() throws IoTDBException;
51-
52-
/**
53-
* Get the bytes used by this CteDataReader.
54-
*
55-
* @return the bytes used by this CteDataReader
56-
*/
57-
long bytesUsed();
5852
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/CteDataStore.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,31 @@
2121

2222
package org.apache.iotdb.db.utils.cte;
2323

24+
import org.apache.iotdb.commons.utils.TestOnly;
2425
import org.apache.iotdb.db.conf.IoTDBConfig;
2526
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2627
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
2728

2829
import org.apache.tsfile.read.common.block.TsBlock;
30+
import org.apache.tsfile.utils.Accountable;
31+
import org.apache.tsfile.utils.RamUsageEstimator;
2932

3033
import java.util.ArrayList;
3134
import java.util.List;
3235
import java.util.concurrent.atomic.AtomicInteger;
3336

34-
public class CteDataStore {
37+
public class CteDataStore implements Accountable {
38+
private static final long INSTANCE_SIZE =
39+
RamUsageEstimator.shallowSizeOfInstance(CteDataStore.class);
40+
3541
private final TableSchema tableSchema;
3642
private final List<Integer> columnIndex2TsBlockColumnIndexList;
3743

3844
private final List<TsBlock> cachedData;
3945
private long cachedBytes;
4046
private int cachedRows;
4147

42-
// reference count by CteScanOperator
48+
// reference count by CteScanReader
4349
private final AtomicInteger count;
4450

4551
public CteDataStore(TableSchema tableSchema, List<Integer> columnIndex2TsBlockColumnIndexList) {
@@ -75,10 +81,6 @@ public List<TsBlock> getCachedData() {
7581
return cachedData;
7682
}
7783

78-
public long getCachedBytes() {
79-
return cachedBytes;
80-
}
81-
8284
public TableSchema getTableSchema() {
8385
return tableSchema;
8486
}
@@ -87,7 +89,21 @@ public List<Integer> getColumnIndex2TsBlockColumnIndexList() {
8789
return columnIndex2TsBlockColumnIndexList;
8890
}
8991

90-
public int increaseRefCount() {
92+
public int incrementAndGetCount() {
9193
return count.incrementAndGet();
9294
}
95+
96+
public int decrementAndGetCount() {
97+
return count.decrementAndGet();
98+
}
99+
100+
@Override
101+
public long ramBytesUsed() {
102+
return INSTANCE_SIZE + cachedBytes;
103+
}
104+
105+
@TestOnly
106+
public int getCount() {
107+
return count.get();
108+
}
93109
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,43 +22,54 @@
2222
package org.apache.iotdb.db.utils.cte;
2323

2424
import org.apache.iotdb.commons.exception.IoTDBException;
25+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2526

2627
import org.apache.tsfile.read.common.block.TsBlock;
2728
import org.apache.tsfile.utils.RamUsageEstimator;
2829

29-
import java.util.List;
30-
3130
public class MemoryReader implements CteDataReader {
3231
private static final long INSTANCE_SIZE =
3332
RamUsageEstimator.shallowSizeOfInstance(MemoryReader.class);
3433

34+
// thread-safe memory manager
35+
private final MemoryReservationManager memoryReservationManager;
3536
// all the data in MemoryReader lies in memory
36-
private final List<TsBlock> cachedData;
37+
private final CteDataStore dataStore;
3738
private int tsBlockIndex;
3839

39-
public MemoryReader(List<TsBlock> cachedTsBlock) {
40-
this.cachedData = cachedTsBlock;
40+
public MemoryReader(CteDataStore dataStore, MemoryReservationManager memoryReservationManager) {
41+
this.dataStore = dataStore;
4142
this.tsBlockIndex = 0;
43+
this.memoryReservationManager = memoryReservationManager;
44+
if (dataStore.incrementAndGetCount() == 1) {
45+
memoryReservationManager.reserveMemoryCumulatively(dataStore.ramBytesUsed());
46+
}
4247
}
4348

4449
@Override
4550
public boolean hasNext() throws IoTDBException {
46-
return cachedData != null && tsBlockIndex < cachedData.size();
51+
return dataStore.getCachedData() != null && tsBlockIndex < dataStore.getCachedData().size();
4752
}
4853

4954
@Override
5055
public TsBlock next() throws IoTDBException {
51-
if (cachedData == null || tsBlockIndex >= cachedData.size()) {
56+
if (dataStore.getCachedData() == null || tsBlockIndex >= dataStore.getCachedData().size()) {
5257
return null;
5358
}
54-
return cachedData.get(tsBlockIndex++);
59+
return dataStore.getCachedData().get(tsBlockIndex++);
5560
}
5661

5762
@Override
58-
public void close() throws IoTDBException {}
63+
public void close() throws IoTDBException {
64+
if (dataStore.decrementAndGetCount() == 0) {
65+
memoryReservationManager.releaseMemoryCumulatively(dataStore.ramBytesUsed());
66+
}
67+
}
5968

6069
@Override
61-
public long bytesUsed() {
70+
public long ramBytesUsed() {
71+
// The calculation excludes the memory occupied by the CteDataStore.
72+
// memory allocate/release for CteDataStore is handled during constructor and close
6273
return INSTANCE_SIZE;
6374
}
6475
}

0 commit comments

Comments
 (0)