Skip to content

Commit 02febc0

Browse files
authored
Optimize group by query in ClientRPCServiceImpl to reduce cpu usage
1 parent 1b9a446 commit 02febc0

File tree

9 files changed

+195
-43
lines changed

9 files changed

+195
-43
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,18 @@
5656
import org.apache.iotdb.db.protocol.session.IClientSession;
5757
import org.apache.iotdb.db.protocol.session.SessionManager;
5858
import org.apache.iotdb.db.protocol.thrift.OperationType;
59-
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
60-
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
61-
import org.apache.iotdb.db.queryengine.common.QueryId;
6259
import org.apache.iotdb.db.queryengine.common.SessionInfo;
6360
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
6461
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
6562
import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
6663
import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator;
6764
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
68-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
69-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
70-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
65+
import org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext;
66+
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
7167
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
7268
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
7369
import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
7470
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
75-
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
7671
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7772
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
7873
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -124,6 +119,7 @@
124119
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
125120
import org.apache.iotdb.db.storageengine.StorageEngine;
126121
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
122+
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
127123
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
128124
import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
129125
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -226,7 +222,6 @@
226222

227223
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
228224
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
229-
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
230225
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
231226
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
232227
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
@@ -270,7 +265,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
270265
private final TreeDeviceSchemaCacheManager DATA_NODE_SCHEMA_CACHE =
271266
TreeDeviceSchemaCacheManager.getInstance();
272267

273-
public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);
268+
public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);
274269

275270
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
276271
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
@@ -790,6 +785,9 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
790785
}
791786
}
792787

788+
private final List<InputLocation[]> inputLocationList =
789+
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)});
790+
793791
@SuppressWarnings("java:S2095") // close() do nothing
794792
private List<TsBlock> executeGroupByQueryInternal(
795793
SessionInfo sessionInfo,
@@ -812,21 +810,14 @@ private List<TsBlock> executeGroupByQueryInternal(
812810

813811
Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1);
814812

815-
QueryId queryId = new QueryId("stub_query");
816-
FragmentInstanceId instanceId =
817-
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
818-
FragmentInstanceStateMachine stateMachine =
819-
new FragmentInstanceStateMachine(
820-
instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor);
821-
FragmentInstanceContext fragmentInstanceContext =
822-
createFragmentInstanceContext(
823-
instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter);
813+
FakedFragmentInstanceContext fragmentInstanceContext =
814+
new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0));
815+
824816
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
825817
PlanNodeId planNodeId = new PlanNodeId("1");
826-
driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
827-
driverContext
828-
.getOperatorContexts()
829-
.forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
818+
OperatorContext operatorContext =
819+
new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext);
820+
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE);
830821

831822
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
832823
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
@@ -844,14 +835,18 @@ private List<TsBlock> executeGroupByQueryInternal(
844835
true,
845836
true),
846837
AggregationStep.SINGLE,
847-
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}));
838+
inputLocationList);
848839

849840
GroupByTimeParameter groupByTimeParameter =
850841
new GroupByTimeParameter(
851842
startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true);
852843

853844
IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType);
854845
AbstractSeriesAggregationScanOperator operator;
846+
boolean canUseStatistics =
847+
!TSDataType.BLOB.equals(dataType)
848+
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
849+
&& !TAggregationType.FIRST_VALUE.equals(aggregationType));
855850
IFullPath path;
856851
if (isAligned) {
857852
path =
@@ -865,36 +860,37 @@ private List<TsBlock> executeGroupByQueryInternal(
865860
(AlignedFullPath) path,
866861
Ordering.ASC,
867862
scanOptionsBuilder.build(),
868-
driverContext.getOperatorContexts().get(0),
863+
operatorContext,
869864
Collections.singletonList(aggregator),
870865
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
871866
groupByTimeParameter,
872867
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
873-
!TSDataType.BLOB.equals(dataType)
874-
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
875-
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
868+
canUseStatistics);
876869
} else {
877870
path = new NonAlignedFullPath(deviceID, measurementSchema);
871+
// String[] splits = device.split("\\.");
872+
// String[] fullPaths = new String[splits.length + 1];
873+
// System.arraycopy(splits, 0, fullPaths, 0, splits.length);
874+
// fullPaths[splits.length] = measurement;
875+
// path = new MeasurementPath(fullPaths, measurementSchema);
878876
operator =
879877
new SeriesAggregationScanOperator(
880878
planNodeId,
881879
path,
882880
Ordering.ASC,
883881
scanOptionsBuilder.build(),
884-
driverContext.getOperatorContexts().get(0),
882+
operatorContext,
885883
Collections.singletonList(aggregator),
886884
initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()),
887885
groupByTimeParameter,
888886
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
889-
!TSDataType.BLOB.equals(dataType)
890-
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
891-
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
887+
canUseStatistics);
892888
}
893889

894890
try {
895891
List<TsBlock> result = new ArrayList<>();
896-
fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
897-
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
892+
QueryDataSource dataSource = fragmentInstanceContext.getSharedQueryDataSource(path);
893+
operator.initQueryDataSource(dataSource);
898894

899895
while (operator.hasNext()) {
900896
result.add(operator.next());
@@ -904,7 +900,7 @@ private List<TsBlock> executeGroupByQueryInternal(
904900
} catch (Exception e) {
905901
throw new RuntimeException(e);
906902
} finally {
907-
fragmentInstanceContext.releaseResource();
903+
fragmentInstanceContext.releaseSharedQueryDataSource();
908904
}
909905
}
910906

@@ -1307,7 +1303,7 @@ public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryInt
13071303
deviceId,
13081304
measurementId,
13091305
dataType,
1310-
true,
1306+
req.isAligned,
13111307
req.getStartTime(),
13121308
req.getEndTime(),
13131309
req.getInterval(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ public DriverContext() {
5050
this.fragmentInstanceContext = null;
5151
}
5252

53+
@TestOnly
54+
// should only be used by executeGroupByQueryInternal
55+
public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
56+
this.fragmentInstanceContext = fragmentInstanceContext;
57+
}
58+
5359
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
5460
this.fragmentInstanceContext = fragmentInstanceContext;
5561
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.fragment;
21+
22+
import org.apache.iotdb.commons.path.IFullPath;
23+
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
25+
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
26+
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
27+
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
28+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
29+
30+
import org.apache.tsfile.read.filter.basic.Filter;
31+
32+
import java.util.Collections;
33+
import java.util.List;
34+
35+
public class FakedFragmentInstanceContext extends FragmentInstanceContext {
36+
37+
public FakedFragmentInstanceContext(Filter timeFilter, DataRegion dataRegion) {
38+
super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion);
39+
}
40+
41+
public QueryDataSource getSharedQueryDataSource(IFullPath sourcePath)
42+
throws QueryProcessException {
43+
if (sharedQueryDataSource == null) {
44+
initQueryDataSource(sourcePath);
45+
}
46+
return (QueryDataSource) sharedQueryDataSource;
47+
}
48+
49+
public void initQueryDataSource(IFullPath sourcePath) throws QueryProcessException {
50+
51+
dataRegion.readLock();
52+
try {
53+
this.sharedQueryDataSource =
54+
dataRegion.query(
55+
Collections.singletonList(sourcePath),
56+
sourcePath.getDeviceId(),
57+
this,
58+
getGlobalTimeFilter(),
59+
null);
60+
61+
// used files should be added before mergeLock is unlocked, or they may be deleted by
62+
// running merge
63+
if (sharedQueryDataSource != null) {
64+
((QueryDataSource) sharedQueryDataSource).setSingleDevice(true);
65+
List<TsFileResource> tsFileList =
66+
((QueryDataSource) sharedQueryDataSource).getSeqResources();
67+
if (tsFileList != null) {
68+
for (TsFileResource tsFile : tsFileList) {
69+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed());
70+
}
71+
}
72+
tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources();
73+
if (tsFileList != null) {
74+
for (TsFileResource tsFile : tsFileList) {
75+
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, tsFile.isClosed());
76+
}
77+
}
78+
}
79+
} finally {
80+
dataRegion.readUnlock();
81+
}
82+
}
83+
84+
public void releaseSharedQueryDataSource() {
85+
if (sharedQueryDataSource != null) {
86+
List<TsFileResource> tsFileList = ((QueryDataSource) sharedQueryDataSource).getSeqResources();
87+
if (tsFileList != null) {
88+
for (TsFileResource tsFile : tsFileList) {
89+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed());
90+
}
91+
}
92+
tsFileList = ((QueryDataSource) sharedQueryDataSource).getUnseqResources();
93+
if (tsFileList != null) {
94+
for (TsFileResource tsFile : tsFileList) {
95+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, tsFile.isClosed());
96+
}
97+
}
98+
sharedQueryDataSource = null;
99+
}
100+
}
101+
102+
@Override
103+
protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
104+
return false;
105+
}
106+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
3838
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
3939
import org.apache.iotdb.db.storageengine.StorageEngine;
40+
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
4041
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
4142
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
4243
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -83,7 +84,7 @@ public class FragmentInstanceContext extends QueryContext {
8384

8485
private final MemoryReservationManager memoryReservationManager;
8586

86-
private IDataRegionForQuery dataRegion;
87+
protected IDataRegionForQuery dataRegion;
8788
private Filter globalTimeFilter;
8889

8990
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
@@ -93,7 +94,7 @@ public class FragmentInstanceContext extends QueryContext {
9394
private Map<IDeviceID, DeviceContext> devicePathsToContext;
9495

9596
// Shared by all scan operators in this fragment instance to avoid memory problem
96-
private IQueryDataSource sharedQueryDataSource;
97+
protected IQueryDataSource sharedQueryDataSource;
9798

9899
/** closed tsfile used in this fragment instance. */
99100
private Set<TsFileResource> closedFilePaths;
@@ -185,7 +186,7 @@ public static FragmentInstanceContext createFragmentInstanceContext(
185186
}
186187

187188
public static FragmentInstanceContext createFragmentInstanceContextForCompaction(long queryId) {
188-
return new FragmentInstanceContext(queryId);
189+
return new FragmentInstanceContext(queryId, null, null, null);
189190
}
190191

191192
public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
@@ -288,13 +289,19 @@ public void setDataRegion(IDataRegionForQuery dataRegion) {
288289
}
289290

290291
// used for compaction
291-
private FragmentInstanceContext(long queryId) {
292+
protected FragmentInstanceContext(
293+
long queryId,
294+
MemoryReservationManager memoryReservationManager,
295+
Filter timeFilter,
296+
DataRegion dataRegion) {
292297
this.queryId = queryId;
293298
this.id = null;
294299
this.stateMachine = null;
295300
this.dataNodeQueryContextMap = null;
296301
this.dataNodeQueryContext = null;
297-
this.memoryReservationManager = null;
302+
this.dataRegion = dataRegion;
303+
this.globalTimeFilter = timeFilter;
304+
this.memoryReservationManager = memoryReservationManager;
298305
}
299306

300307
public void start() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public QueryContext(long queryId, boolean debug, long startTime, long timeout) {
9191
}
9292

9393
// if the mods file does not exist, do not add it to the cache
94-
private boolean checkIfModificationExists(TsFileResource tsFileResource) {
94+
protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
9595
if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
9696
return false;
9797
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ public class DriverTaskId implements ID, Comparable<DriverTaskId> {
3636
// Currently, we just save pipelineId in driverTask since it's one-to-one relation.
3737
private final int pipelineId;
3838
private final String fullId;
39+
private static final String EMPTY_FULL_ID = "EmptyFullId";
3940

4041
public DriverTaskId(FragmentInstanceId id, int pipelineId) {
4142
this.fragmentInstanceId = id;
4243
this.pipelineId = pipelineId;
43-
this.fullId = String.format("%s.%d", id.getFullId(), pipelineId);
44+
this.fullId = String.format("%s.%d", id == null ? EMPTY_FULL_ID : id.getFullId(), pipelineId);
4445
}
4546

4647
@Override

0 commit comments

Comments
 (0)