Skip to content

Commit 864b94e

Browse files
authored
Add extra info for TableScanOperator and AggTableScanOperator in Explain Analyze
1 parent 5b7f1c6 commit 864b94e

File tree

4 files changed

+41
-21
lines changed

4 files changed

+41
-21
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.HashMap;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.TimeUnit;
3839

3940
/**
@@ -165,7 +166,8 @@ public long getOutputRows() {
165166

166167
public void recordSpecifiedInfo(String key, String value) {
167168
if (specifiedInfo == null) {
168-
specifiedInfo = new HashMap<>();
169+
// explain analyze operator fetching and current operator updating may be concurrently
170+
specifiedInfo = new ConcurrentHashMap<>();
169171
}
170172
specifiedInfo.put(key, value);
171173
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@
6767

6868
import static java.lang.String.format;
6969
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
70+
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
7071
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
72+
import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
7173
import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
7274

7375
public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator {
@@ -161,6 +163,7 @@ public TableAggregationTableScanOperator(
161163
this.columnsIndexArray = columnsIndexArray;
162164
this.deviceEntries = deviceEntries;
163165
this.deviceCount = deviceEntries.size();
166+
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount));
164167
this.scanOrder = scanOrder;
165168
this.seriesScanOptions = seriesScanOptions;
166169
this.measurementColumnNames = measurementColumnNames;
@@ -169,6 +172,7 @@ public TableAggregationTableScanOperator(
169172
this.measurementColumnTSDataTypes =
170173
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
171174
this.currentDeviceIndex = 0;
175+
this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0));
172176
this.aggArguments = aggArguments;
173177
this.timeIterator = tableTimeRangeIterator;
174178
if (tableTimeRangeIterator.getType()
@@ -327,7 +331,7 @@ && readAndCalcFromFile()) {
327331
// all data of current device has been consumed
328332
updateResultTsBlock();
329333
timeIterator.resetCurTimeRange();
330-
currentDeviceIndex++;
334+
nextDevice();
331335
}
332336

333337
if (currentDeviceIndex < deviceCount) {
@@ -800,29 +804,34 @@ public boolean isAllAggregatorsHasFinalResult(List<TableAggregator> aggregators)
800804
}
801805

802806
private void checkIfAllAggregatorHasFinalResult() {
803-
if (allAggregatorsHasFinalResult) {
804-
// for SINGLE_TIME_ITERATOR, if allAggregatorsHasFinalResult, just consume next device
805-
if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
806-
currentDeviceIndex++;
807-
inputTsBlock = null;
808-
809-
if (currentDeviceIndex < deviceCount) {
810-
// construct AlignedSeriesScanUtil for next device
811-
constructAlignedSeriesScanUtil();
812-
queryDataSource.reset();
813-
this.seriesScanUtil.initQueryDataSource(queryDataSource);
814-
}
807+
if (allAggregatorsHasFinalResult
808+
&& timeIterator.getType()
809+
== ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
810+
nextDevice();
811+
inputTsBlock = null;
815812

816-
if (currentDeviceIndex >= deviceCount) {
817-
// all devices have been consumed
818-
timeIterator.setFinished();
819-
}
813+
if (currentDeviceIndex < deviceCount) {
814+
// construct AlignedSeriesScanUtil for next device
815+
constructAlignedSeriesScanUtil();
816+
queryDataSource.reset();
817+
this.seriesScanUtil.initQueryDataSource(queryDataSource);
818+
}
820819

821-
allAggregatorsHasFinalResult = false;
820+
if (currentDeviceIndex >= deviceCount) {
821+
// all devices have been consumed
822+
timeIterator.setFinished();
822823
}
824+
825+
allAggregatorsHasFinalResult = false;
823826
}
824827
}
825828

829+
private void nextDevice() {
830+
currentDeviceIndex++;
831+
this.operatorContext.recordSpecifiedInfo(
832+
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
833+
}
834+
826835
private void resetTableAggregators() {
827836
tableAggregators.forEach(TableAggregator::reset);
828837
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,15 @@
5454
import java.util.stream.Collectors;
5555

5656
import static org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
57+
import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
5758
import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
5859

5960
public class TableScanOperator extends AbstractSeriesScanOperator {
6061
private static final long INSTANCE_SIZE =
6162
RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
6263

64+
public static final String CURRENT_DEVICE_INDEX_STRING = "CurrentDeviceIndex";
65+
6366
public static final LongColumn TIME_COLUMN_TEMPLATE =
6467
new LongColumn(1, Optional.empty(), new long[] {0});
6568

@@ -106,6 +109,7 @@ public TableScanOperator(
106109
int maxTsBlockLineNum) {
107110
this.sourceId = sourceId;
108111
this.operatorContext = context;
112+
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(deviceEntries.size()));
109113
this.columnSchemas = columnSchemas;
110114
this.columnsIndexArray = columnsIndexArray;
111115
this.deviceEntries = deviceEntries;
@@ -118,6 +122,7 @@ public TableScanOperator(
118122
this.measurementColumnTSDataTypes =
119123
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
120124
this.currentDeviceIndex = 0;
125+
this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0));
121126

122127
this.maxReturnSize =
123128
Math.min(
@@ -297,6 +302,8 @@ private void prepareForNextDevice() {
297302
// reset QueryDataSource
298303
queryDataSource.reset();
299304
this.seriesScanUtil.initQueryDataSource(queryDataSource);
305+
this.operatorContext.recordSpecifiedInfo(
306+
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
300307
}
301308
}
302309

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
102102
private static final int BOX_MARGIN = 1;
103103
private static final int CONNECTION_LINE_HEIGHT = 2;
104104

105+
public static final String DEVICE_NUMBER = "DeviceNumber";
106+
105107
@Override
106108
public List<String> visitPlan(PlanNode node, GraphContext context) {
107109
List<String> boxValue = new ArrayList<>();
@@ -616,7 +618,7 @@ public List<String> visitTableScan(TableScanNode node, GraphContext context) {
616618
boxValue.add(String.format("TableScan-%s", node.getPlanNodeId().getId()));
617619
boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString()));
618620
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
619-
boxValue.add(String.format("DeviceEntriesSize: %s", node.getDeviceEntries().size()));
621+
boxValue.add(String.format("DeviceNumber: %s", node.getDeviceEntries().size()));
620622
boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
621623
if (node.getTimePredicate().isPresent()) {
622624
boxValue.add(String.format("TimePredicate: %s", node.getTimePredicate().get()));
@@ -684,7 +686,7 @@ public List<String> visitAggregationTableScan(
684686
String.format("Project-Expressions: %s", node.getProjection().getMap().values()));
685687
}
686688

687-
boxValue.add(String.format("DeviceEntriesSize: %s", node.getDeviceEntries().size()));
689+
boxValue.add(String.format("DeviceNumber: %s", node.getDeviceEntries().size()));
688690
boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
689691
if (node.getPushDownPredicate() != null) {
690692
boxValue.add(String.format("PushDownPredicate: %s", node.getPushDownPredicate()));

0 commit comments

Comments
 (0)