Skip to content

Commit 743e2e7

Browse files
authored
Optimize group by query in ClientRPCServiceImpl to reduce cpu usage (#15178)
1 parent 5ae3728 commit 743e2e7

File tree

10 files changed

+212
-68
lines changed

10 files changed

+212
-68
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,19 @@
5353
import org.apache.iotdb.db.protocol.session.IClientSession;
5454
import org.apache.iotdb.db.protocol.session.SessionManager;
5555
import org.apache.iotdb.db.protocol.thrift.OperationType;
56-
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
57-
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
58-
import org.apache.iotdb.db.queryengine.common.QueryId;
5956
import org.apache.iotdb.db.queryengine.common.SessionInfo;
6057
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
6158
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
6259
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
6360
import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
6461
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
6562
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
66-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
67-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
68-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
63+
import org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext;
64+
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
6965
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
7066
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
7167
import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
7268
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
73-
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
7469
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7570
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
7671
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -109,6 +104,7 @@
109104
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
110105
import org.apache.iotdb.db.storageengine.StorageEngine;
111106
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
107+
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
112108
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
113109
import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
114110
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -205,7 +201,6 @@
205201

206202
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
207203
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
208-
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
209204
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
210205
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
211206
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
@@ -244,7 +239,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
244239

245240
private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance();
246241

247-
public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);
242+
public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);
248243

249244
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
250245
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
@@ -655,6 +650,9 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
655650
}
656651
}
657652

653+
private final List<InputLocation[]> inputLocationList =
654+
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)});
655+
658656
@SuppressWarnings("java:S2095") // close() do nothing
659657
private List<TsBlock> executeGroupByQueryInternal(
660658
SessionInfo sessionInfo,
@@ -677,21 +675,14 @@ private List<TsBlock> executeGroupByQueryInternal(
677675

678676
Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1);
679677

680-
QueryId queryId = new QueryId("stub_query");
681-
FragmentInstanceId instanceId =
682-
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
683-
FragmentInstanceStateMachine stateMachine =
684-
new FragmentInstanceStateMachine(
685-
instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor);
686-
FragmentInstanceContext fragmentInstanceContext =
687-
createFragmentInstanceContext(
688-
instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter);
678+
FakedFragmentInstanceContext fragmentInstanceContext =
679+
new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0));
680+
689681
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
690682
PlanNodeId planNodeId = new PlanNodeId("1");
691-
driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
692-
driverContext
693-
.getOperatorContexts()
694-
.forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
683+
OperatorContext operatorContext =
684+
new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext);
685+
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE);
695686

696687
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
697688
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
@@ -709,19 +700,23 @@ private List<TsBlock> executeGroupByQueryInternal(
709700
true,
710701
true),
711702
AggregationStep.SINGLE,
712-
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}));
703+
inputLocationList);
713704

714705
GroupByTimeParameter groupByTimeParameter =
715706
new GroupByTimeParameter(
716707
startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true);
717708

718709
IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType);
719710
AbstractSeriesAggregationScanOperator operator;
711+
boolean canUseStatistics =
712+
!TSDataType.BLOB.equals(dataType)
713+
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
714+
&& !TAggregationType.FIRST_VALUE.equals(aggregationType));
720715
PartialPath path;
721716
if (isAligned) {
722717
path =
723718
new AlignedPath(
724-
device,
719+
device.split("\\."),
725720
Collections.singletonList(measurement),
726721
Collections.singletonList(measurementSchema));
727722
operator =
@@ -730,36 +725,36 @@ private List<TsBlock> executeGroupByQueryInternal(
730725
(AlignedPath) path,
731726
Ordering.ASC,
732727
scanOptionsBuilder.build(),
733-
driverContext.getOperatorContexts().get(0),
728+
operatorContext,
734729
Collections.singletonList(aggregator),
735730
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
736731
groupByTimeParameter,
737732
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
738-
!TSDataType.BLOB.equals(dataType)
739-
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
740-
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
733+
canUseStatistics);
741734
} else {
742-
path = new MeasurementPath(device, measurement, measurementSchema);
735+
String[] splits = device.split("\\.");
736+
String[] fullPaths = new String[splits.length + 1];
737+
System.arraycopy(splits, 0, fullPaths, 0, splits.length);
738+
fullPaths[splits.length] = measurement;
739+
path = new MeasurementPath(fullPaths, measurementSchema);
743740
operator =
744741
new SeriesAggregationScanOperator(
745742
planNodeId,
746743
path,
747744
Ordering.ASC,
748745
scanOptionsBuilder.build(),
749-
driverContext.getOperatorContexts().get(0),
746+
operatorContext,
750747
Collections.singletonList(aggregator),
751748
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
752749
groupByTimeParameter,
753750
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
754-
!TSDataType.BLOB.equals(dataType)
755-
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
756-
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
751+
canUseStatistics);
757752
}
758753

759754
try {
760755
List<TsBlock> result = new ArrayList<>();
761-
fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
762-
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
756+
QueryDataSource dataSource = fragmentInstanceContext.getSharedQueryDataSource(path);
757+
operator.initQueryDataSource(dataSource);
763758

764759
while (operator.hasNext()) {
765760
result.add(operator.next());
@@ -769,7 +764,7 @@ private List<TsBlock> executeGroupByQueryInternal(
769764
} catch (Exception e) {
770765
throw new RuntimeException(e);
771766
} finally {
772-
fragmentInstanceContext.releaseResource();
767+
fragmentInstanceContext.releaseSharedQueryDataSource();
773768
}
774769
}
775770

@@ -1075,7 +1070,7 @@ public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryInt
10751070
deviceId,
10761071
measurementId,
10771072
dataType,
1078-
true,
1073+
req.isAligned,
10791074
req.getStartTime(),
10801075
req.getEndTime(),
10811076
req.getInterval(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,29 +64,22 @@ public Aggregator(
6464

6565
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
6666
public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
67-
long startTime = System.nanoTime();
68-
try {
69-
checkArgument(
70-
step.isInputRaw(),
71-
"Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
72-
for (InputLocation[] inputLocations : inputLocationList) {
73-
Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
74-
timeAndValueColumn[0] = tsBlock.getTimeColumn();
75-
for (int i = 0; i < inputLocations.length; i++) {
76-
checkArgument(
77-
inputLocations[i].getTsBlockIndex() == 0,
78-
"RawDataAggregateOperator can only process one tsBlock input.");
79-
int index = inputLocations[i].getValueColumnIndex();
80-
// for count_time, time column is also its value column
81-
// for max_by, the input column can also be time column.
82-
timeAndValueColumn[1 + i] =
83-
index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
84-
}
85-
accumulator.addInput(timeAndValueColumn, bitMap);
67+
checkArgument(
68+
step.isInputRaw(),
69+
"Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
70+
for (InputLocation[] inputLocations : inputLocationList) {
71+
Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
72+
timeAndValueColumn[0] = tsBlock.getTimeColumn();
73+
for (int i = 0; i < inputLocations.length; i++) {
74+
checkArgument(
75+
inputLocations[i].getTsBlockIndex() == 0,
76+
"RawDataAggregateOperator can only process one tsBlock input.");
77+
int index = inputLocations[i].getValueColumnIndex();
78+
// for count_time, time column is also its value column
79+
// for max_by, the input column can also be time column.
80+
timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
8681
}
87-
} finally {
88-
QUERY_EXECUTION_METRICS.recordExecutionCost(
89-
AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
82+
accumulator.addInput(timeAndValueColumn, bitMap);
9083
}
9184
}
9285

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ public DriverContext() {
5050
this.fragmentInstanceContext = null;
5151
}
5252

53+
@TestOnly
54+
// should only be used by executeGroupByQueryInternal
55+
public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
56+
this.fragmentInstanceContext = fragmentInstanceContext;
57+
}
58+
5359
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
5460
this.fragmentInstanceContext = fragmentInstanceContext;
5561
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.fragment;
21+
22+
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
25+
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
26+
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
27+
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
28+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
29+
30+
import org.apache.tsfile.read.filter.basic.Filter;
31+
32+
import java.util.Collections;
33+
import java.util.List;
34+
35+
public class FakedFragmentInstanceContext extends FragmentInstanceContext {
36+
37+
public FakedFragmentInstanceContext(Filter timeFilter, DataRegion dataRegion) {
38+
super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion);
39+
}
40+
41+
public QueryDataSource getSharedQueryDataSource(PartialPath sourcePath)
42+
throws QueryProcessException {
43+
if (sharedQueryDataSource == null) {
44+
initQueryDataSource(sourcePath);
45+
}
46+
return (QueryDataSource) sharedQueryDataSource;
47+
}
48+
49+
public void initQueryDataSource(PartialPath sourcePath) throws QueryProcessException {
50+
51+
dataRegion.readLock();
52+
try {
53+
this.sharedQueryDataSource =
54+
dataRegion.query(
55+
Collections.singletonList(sourcePath),
56+
sourcePath.getDevice(),
57+
this,
58+
getGlobalTimeFilter(),
59+
null);
60+
61+
// used files should be added before mergeLock is unlocked, or they may be deleted by
62+
// running merge
63+
if (sharedQueryDataSource != null) {
64+
((QueryDataSource) sharedQueryDataSource).setSingleDevice(true);
65+
List<TsFileResource> tsFileList =
66+
((QueryDataSource) sharedQueryDataSource).getSeqResources();
67+
if (tsFileList != null) {
68+
for (TsFileResource tsFile : tsFileList) {
69+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed());
70+
}
71+
}
72+
tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources();
73+
if (tsFileList != null) {
74+
for (TsFileResource tsFile : tsFileList) {
75+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed());
76+
}
77+
}
78+
}
79+
} finally {
80+
dataRegion.readUnlock();
81+
}
82+
}
83+
84+
public void releaseSharedQueryDataSource() {
85+
if (sharedQueryDataSource != null) {
86+
List<TsFileResource> tsFileList = ((QueryDataSource) sharedQueryDataSource).getSeqResources();
87+
if (tsFileList != null) {
88+
for (TsFileResource tsFile : tsFileList) {
89+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed());
90+
}
91+
}
92+
tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources();
93+
if (tsFileList != null) {
94+
for (TsFileResource tsFile : tsFileList) {
95+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed());
96+
}
97+
}
98+
sharedQueryDataSource = null;
99+
}
100+
}
101+
102+
@Override
103+
protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
104+
return false;
105+
}
106+
}

0 commit comments

Comments
 (0)