|
24 | 24 | import org.apache.iotdb.commons.utils.TestOnly; |
25 | 25 | import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; |
26 | 26 | import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; |
27 | | -import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSourceOperator; |
| 27 | +import org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator; |
28 | 28 | import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
29 | 29 | import org.apache.iotdb.db.utils.cte.CteDataReader; |
30 | 30 | import org.apache.iotdb.db.utils.cte.CteDataStore; |
|
35 | 35 | import org.slf4j.Logger; |
36 | 36 | import org.slf4j.LoggerFactory; |
37 | 37 |
|
38 | | -public class CteScanOperator extends AbstractSourceOperator { |
| 38 | +public class CteScanOperator implements SourceOperator { |
39 | 39 | private static final Logger LOGGER = LoggerFactory.getLogger(CteScanOperator.class); |
40 | 40 | private static final long INSTANCE_SIZE = |
41 | 41 | RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class); |
42 | 42 |
|
| 43 | + private final OperatorContext operatorContext; |
| 44 | + private final PlanNodeId sourceId; |
| 45 | + |
43 | 46 | private final CteDataStore dataStore; |
| 47 | + private final CteDataReader dataReader; |
44 | 48 | private final int dataStoreRefCount; |
45 | | - private CteDataReader dataReader; |
46 | 49 |
|
47 | 50 | public CteScanOperator( |
48 | 51 | OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore dataStore) { |
49 | 52 | this.operatorContext = operatorContext; |
50 | 53 | this.sourceId = sourceId; |
51 | 54 | this.dataStore = dataStore; |
| 55 | + this.dataReader = new MemoryReader(dataStore.getCachedData()); |
52 | 56 | this.dataStoreRefCount = dataStore.increaseRefCount(); |
53 | | - prepareReader(); |
54 | 57 | } |
55 | 58 |
|
56 | 59 | @Override |
@@ -104,21 +107,24 @@ public long calculateRetainedSizeAfterCallingNext() { |
104 | 107 | @Override |
105 | 108 | public long ramBytesUsed() { |
106 | 109 | long bytes = |
107 | | - INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext); |
108 | | - if (dataReader != null) { |
109 | | - bytes += dataReader.bytesUsed(); |
110 | | - } |
| 110 | + INSTANCE_SIZE |
| 111 | + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) |
| 112 | + + dataReader.bytesUsed(); |
111 | 113 | if (dataStoreRefCount == 1) { |
112 | 114 | bytes += dataStore.getCachedBytes(); |
113 | 115 | } |
114 | 116 |
|
115 | 117 | return bytes; |
116 | 118 | } |
117 | 119 |
|
118 | | - private void prepareReader() { |
119 | | - if (dataStore.getCachedBytes() != 0) { |
120 | | - dataReader = new MemoryReader(dataStore.getCachedData()); |
121 | | - } |
| 120 | + @Override |
| 121 | + public OperatorContext getOperatorContext() { |
| 122 | + return operatorContext; |
| 123 | + } |
| 124 | + |
| 125 | + @Override |
| 126 | + public PlanNodeId getSourceId() { |
| 127 | + return sourceId; |
122 | 128 | } |
123 | 129 |
|
124 | 130 | @TestOnly |
|
0 commit comments