diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e0fdd3709d29c6..1df0f1ef70961e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -102,6 +102,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf for (int i = 0; i < channels.size(); ++i) { if (channels[i]->is_local()) { local_size++; + local_channel_ids.emplace_back(i); _last_local_channel_idx = i; } } @@ -287,7 +288,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED || - sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED); + sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED || + sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE); #endif _name = "ExchangeSinkOperatorX"; _pool = std::make_shared(); @@ -494,6 +496,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } local_state.current_channel_idx = (local_state.current_channel_idx + 1) % _writer_count; + } else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) { + DCHECK_LT(local_state.current_channel_idx, local_state.local_channel_ids.size()) + << "local_state.current_channel_idx: " << local_state.current_channel_idx + << ", local_channel_ids: " << to_string(local_state.local_channel_ids); + + // 1. select channel + auto& current_channel = + local_state + .channels[local_state.local_channel_ids[local_state.current_channel_idx]]; + DCHECK(current_channel->is_local()) + << "Only local channel are supported, current_channel_idx: " + << local_state.local_channel_ids[local_state.current_channel_idx]; + if (!current_channel->is_receiver_eof()) { + // 2. serialize, send and rollover block + auto status = current_channel->send_local_block(block, eos, true); + HANDLE_CHANNEL_STATUS(state, current_channel, status); + } + local_state.current_channel_idx = + (local_state.current_channel_idx + 1) % local_state.local_channel_ids.size(); } else { // Range partition // 1. calculate range diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 418fa2f6c177c7..509fcce897b442 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -106,6 +106,7 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState std::vector> channels; int current_channel_idx {0}; // index of current channel to send to if _random == true bool _only_local_exchange {false}; + std::vector local_channel_ids; void on_channel_finished(InstanceLoId channel_id); vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index c54f2c13a6ff28..6038c672790a1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -702,10 +702,10 @@ protected void doDistribute(boolean canUseNereidsDistributePlanner, ExplainLevel } boolean notNeedBackend = false; - // if the query can compute without backend, we can skip check cluster privileges - if (Config.isCloudMode() - && cascadesContext.getConnectContext().supportHandleByFe() - && physicalPlan instanceof ComputeResultSet) { + // the internal query not support process Resultset, so must process by backend + if (cascadesContext.getConnectContext().supportHandleByFe() + && physicalPlan instanceof ComputeResultSet + && !cascadesContext.getConnectContext().getState().isInternal()) { Optional resultSet = ((ComputeResultSet) physicalPlan).computeResultInFe( cascadesContext, Optional.empty(), physicalPlan.getOutput()); if (resultSet.isPresent()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index e234c1d9174a09..b579936d47c425 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -180,6 +180,7 @@ import org.apache.doris.planner.BlackholeSink; import org.apache.doris.planner.CTEScanNode; import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.DictionarySink; import org.apache.doris.planner.EmptySetNode; @@ -240,6 +241,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -2323,6 +2325,37 @@ && findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot() setOperationNode.setColocate(true); } + // whether accept LocalShuffleUnion. + // the backend need `enable_local_exchange=true` to compute whether a channel is `local`, + // and LocalShuffleUnion need `local` channels to do random local shuffle, so we need check + // `enable_local_exchange` + if (setOperation instanceof PhysicalUnion + && context.getConnectContext().getSessionVariable().getEnableLocalExchange() + && SessionVariable.canUseNereidsDistributePlanner()) { + boolean isLocalShuffleUnion = false; + if (setOperation.getPhysicalProperties().getDistributionSpec() instanceof DistributionSpecExecutionAny) { + Map exchangeIdToExchangeNode = new IdentityHashMap<>(); + for (PlanNode child : setOperationNode.getChildren()) { + if (child instanceof ExchangeNode) { + exchangeIdToExchangeNode.put(child.getId().asInt(), (ExchangeNode) child); + } + } + + for (PlanFragment childFragment : setOperationFragment.getChildren()) { + DataSink sink = childFragment.getSink(); + if (sink instanceof DataStreamSink) { + isLocalShuffleUnion |= setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink); + } else if (sink instanceof MultiCastDataSink) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink; + for (DataStreamSink dataStreamSink : multiCastDataSink.getDataStreamSinks()) { + isLocalShuffleUnion |= setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink); + } + } + } + } + ((UnionNode) setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion); + } + return setOperationFragment; } @@ -3215,4 +3248,18 @@ private boolean isSimpleQuery(PhysicalPlan root) { } return child instanceof PhysicalRelation; } + + private boolean setLocalRandomPartition( + Map exchangeIdToExchangeNode, DataStreamSink dataStreamSink) { + ExchangeNode exchangeNode = exchangeIdToExchangeNode.get( + dataStreamSink.getExchNodeId().asInt()); + if (exchangeNode == null) { + return false; + } + exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE); + + DataPartition p2pPartition = new DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE); + dataStreamSink.setOutputPartition(p2pPartition); + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index f045d79250fee5..cf7ea6aede9f03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -499,6 +499,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.hint.DistributeHint; import org.apache.doris.nereids.hint.JoinSkewInfo; import org.apache.doris.nereids.load.NereidsDataDescription; @@ -2069,8 +2070,10 @@ public List> visitMultiStatements(MultiState connectContext.setStatementContext(statementContext); statementContext.setConnectContext(connectContext); } - logicalPlans.add(Pair.of( - ParserUtils.withOrigin(ctx, () -> (LogicalPlan) visit(statement)), statementContext)); + Pair planAndContext = Pair.of( + ParserUtils.withOrigin(ctx, () -> (LogicalPlan) visit(statement)), statementContext); + statementContext.setParsedStatement(new LogicalPlanAdapter(planAndContext.first, statementContext)); + logicalPlans.add(planAndContext); List params = new ArrayList<>(tokenPosToParameters.values()); statementContext.setPlaceholders(params); tokenPosToParameters.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java index bc2c705a4f742b..7362646888aab4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java @@ -200,8 +200,10 @@ private List createAsyncMaterializationContext(CascadesC return ImmutableList.of(); } if (CollectionUtils.isEmpty(availableMTMVs)) { - LOG.info("Enable materialized view rewrite but availableMTMVs is empty, query id " - + "is {}", cascadesContext.getConnectContext().getQueryIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug("Enable materialized view rewrite but availableMTMVs is empty, query id " + + "is {}", cascadesContext.getConnectContext().getQueryIdentifier()); + } return ImmutableList.of(); } List asyncMaterializationContext = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java index bc20d3efa17590..ce75dd3dbfa98b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java @@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SchemaScanNode; +import org.apache.doris.planner.UnionNode; import org.apache.doris.thrift.TExplainLevel; import com.google.common.collect.ArrayListMultimap; @@ -215,7 +216,10 @@ private UnassignedJob buildScanRemoteTableJob( private UnassignedJob buildShuffleJob( StatementContext statementContext, PlanFragment planFragment, ListMultimap inputJobs) { - if (planFragment.isPartitioned()) { + if (planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance) + .stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) { + return new UnassignedLocalShuffleUnionJob(statementContext, planFragment, inputJobs); + } else if (planFragment.isPartitioned()) { return new UnassignedShuffleJob(statementContext, planFragment, inputJobs); } else { return new UnassignedGatherJob(statementContext, planFragment, inputJobs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java new file mode 100644 index 00000000000000..5c21250727558c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.UnionNode; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +/** + * this class is used to local shuffle between the same backend, to save network io. + * + * for example: we have A/B/C three backend, and every backend process 3 instances before the Union, + * then the Union will generate same instances for the source backend, and every source + * instances will random local shuffle to the self backend's three target instances, like this: + * + * UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say there has 3 backends: A/B/C + * | + * +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local shuffle to A4/A5/A6, + * | B1 random local shuffle to B4/B5/B6, + * | C1 random local shuffle to C4/C5/C6 + * | + * +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local shuffle to A4/A5/A6, + * | B2 random local shuffle to B4/B5/B6, + * | C2 random local shuffle to C4/C5/C6 + * | + * +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local shuffle to A4/A5/A6, + * B3 random local shuffle to B4/B5/B6, + * C3 random local shuffle to C4/C5/C6 + */ +public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob { + + public UnassignedLocalShuffleUnionJob(StatementContext statementContext, PlanFragment fragment, + ListMultimap exchangeToChildJob) { + super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob); + } + + @Override + public List computeAssignedJobs( + DistributeContext context, ListMultimap inputJobs) { + ConnectContext connectContext = statementContext.getConnectContext(); + DefaultScanSource noScanSource = DefaultScanSource.empty(); + List unionInstances = Lists.newArrayListWithCapacity(inputJobs.size()); + + List unionNodes = fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance); + Set exchangeIdToUnion = Sets.newLinkedHashSet(); + for (UnionNode unionNode : unionNodes) { + for (PlanNode child : unionNode.getChildren()) { + if (child instanceof ExchangeNode) { + exchangeIdToUnion.add(child.getId().asInt()); + } + } + } + + int id = 0; + for (Entry> exchangeNodeToSources : inputJobs.asMap().entrySet()) { + ExchangeNode exchangeNode = exchangeNodeToSources.getKey(); + if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) { + continue; + } + for (AssignedJob inputInstance : exchangeNodeToSources.getValue()) { + StaticAssignedJob unionInstance = new StaticAssignedJob( + id++, connectContext.nextInstanceId(), this, + inputInstance.getAssignedWorker(), noScanSource + ); + unionInstances.add(unionInstance); + } + } + return unionInstances; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 648fbace47c890..b78bd43677b68a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -65,7 +65,8 @@ public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM || type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED - || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED); + || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED + || type == TPartitionType.RANDOM_LOCAL_SHUFFLE); this.type = type; this.partitionExprs = ImmutableList.of(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 9e9bd4e7fa92bc..ca6fd80ef75aae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -64,7 +64,7 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode) { offset = 0; limit = -1; this.conjuncts = Collections.emptyList(); - children.add(inputNode); + this.children.add(inputNode); TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc(); updateTupleIds(outputTupleDesc); } @@ -155,8 +155,15 @@ public void setRightChildOfBroadcastHashJoin(boolean value) { */ @Override public boolean isSerialOperator() { - return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() - || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo == null; + return ( + ( + ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isUseSerialExchange() + && (partitionType != TPartitionType.RANDOM_LOCAL_SHUFFLE) + ) + || partitionType == TPartitionType.UNPARTITIONED + ) + && mergeInfo == null; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java index 36e50b8f5010ba..ca28e260604ed8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java @@ -25,6 +25,8 @@ import org.apache.doris.thrift.TPlanNodeType; public class UnionNode extends SetOperationNode { + private boolean localShuffleUnion; + public UnionNode(PlanNodeId id, TupleId tupleId) { super(id, tupleId, "UNION"); } @@ -40,4 +42,12 @@ protected void toThrift(TPlanNode msg) { public boolean isSerialOperator() { return children.isEmpty(); } + + public boolean isLocalShuffleUnion() { + return localShuffleUnion; + } + + public void setLocalShuffleUnion(boolean localShuffleUnion) { + this.localShuffleUnion = localShuffleUnion; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 03dac4a899dd1c..bee00905fbd2ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -972,8 +972,11 @@ public TUniqueId getLastQueryId() { public TUniqueId nextInstanceId() { if (loadId != null) { return new TUniqueId(loadId.hi, loadId.lo + instanceIdGenerator.incrementAndGet()); - } else { + } else if (queryId != null) { return new TUniqueId(queryId.hi, queryId.lo + instanceIdGenerator.incrementAndGet()); + } else { + // for test + return new TUniqueId(0, instanceIdGenerator.incrementAndGet()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 187e9f48407f4d..bdb3bff9650b72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -18,7 +18,6 @@ package org.apache.doris.qe; import org.apache.doris.analysis.SetVar; -import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; @@ -29,7 +28,6 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.StatementContext; -import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.metrics.Event; import org.apache.doris.nereids.metrics.EventSwitchParser; import org.apache.doris.nereids.parser.Dialect; @@ -1942,7 +1940,7 @@ public boolean isEnableHboNonStrictMatchingMode() { public boolean enableCommonExprPushdown = true; @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = false, flag = VariableMgr.INVISIBLE, - varType = VariableAnnotation.DEPRECATED) + varType = VariableAnnotation.DEPRECATED, needForward = true) public boolean enableLocalExchange = true; /** @@ -4453,15 +4451,7 @@ public static boolean canUseNereidsDistributePlanner() { if (connectContext == null) { return true; } - SessionVariable sessionVariable = connectContext.getSessionVariable(); - StatementContext statementContext = connectContext.getStatementContext(); - if (statementContext != null) { - StatementBase parsedStatement = statementContext.getParsedStatement(); - if (!(parsedStatement instanceof LogicalPlanAdapter)) { - return false; - } - } - return sessionVariable.enableNereidsDistributePlanner; + return connectContext.getSessionVariable().enableNereidsDistributePlanner; } public boolean isEnableNereidsDistributePlanner() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d369afcdc6b28e..c234a2c38636da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -862,6 +862,9 @@ private void parseByNereids() { } parsedStmt = statements.get(originStmt.idx); } + if (parsedStmt != null && statementContext.getParsedStatement() == null) { + statementContext.setParsedStatement(parsedStmt); + } } public void finalizeQuery() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java new file mode 100644 index 00000000000000..66f5e55c539b39 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.distribute; + +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedLocalShuffleUnionJob; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.DataStreamSink; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.MultiCastDataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.UnionNode; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartitionType; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class LocalShuffleUnionTest extends TestWithFeService { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase("test"); + createTable("create table test.tbl(id int) properties('replication_num' = '1')"); + } + + @Test + public void testLocalShuffleUnion() throws Exception { + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + StmtExecutor stmtExecutor = executeNereidsSql( + "explain distributed plan select * from test.tbl union all select * from test.tbl"); + List fragments = stmtExecutor.planner().getFragments(); + assertHasLocalShuffleUnion(fragments); + } + + @Test + public void testLocalShuffleUnionWithCte() throws Exception { + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + StmtExecutor stmtExecutor = executeNereidsSql( + "explain distributed plan with a as (select * from test.tbl) select * from a union all select * from a"); + List fragments = stmtExecutor.planner().getFragments(); + assertHasLocalShuffleUnion(fragments); + } + + @Test + public void testLocalShuffleUnionWithJoin() throws Exception { + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + StmtExecutor stmtExecutor = executeNereidsSql( + "explain distributed plan select * from (select * from test.tbl union all select * from test.tbl)a left join[broadcast] (select * from test.tbl)b on a.id=b.id"); + List fragments = stmtExecutor.planner().getFragments(); + assertHasLocalShuffleUnion(fragments); + + FragmentIdMapping distributedPlans + = ((NereidsPlanner) stmtExecutor.planner()).getDistributedPlans(); + for (DistributedPlan plan : distributedPlans.values()) { + PipelineDistributedPlan pipelineDistributedPlan = (PipelineDistributedPlan) plan; + if (pipelineDistributedPlan.getFragmentJob() instanceof UnassignedLocalShuffleUnionJob) { + List sourcesInstances = pipelineDistributedPlan.getInputs() + .values() + .stream() + .flatMap(source -> ((PipelineDistributedPlan) source).getInstanceJobs().stream()) + .collect(Collectors.toList()); + + List broadSourceInstances = pipelineDistributedPlan.getInputs() + .entries() + .stream() + .filter(kv -> kv.getKey().getPartitionType() != TPartitionType.RANDOM_LOCAL_SHUFFLE) + .flatMap(kv -> ((PipelineDistributedPlan) kv.getValue()).getInstanceJobs().stream()) + .collect(Collectors.toList()); + + Assertions.assertTrue( + pipelineDistributedPlan.getInstanceJobs().size() < sourcesInstances.size() + ); + + Assertions.assertEquals( + (sourcesInstances.size() - broadSourceInstances.size()), + pipelineDistributedPlan.getInstanceJobs().size() + ); + } + } + } + + private void assertHasLocalShuffleUnion(List fragments) { + boolean hasLocalShuffleUnion = false; + for (PlanFragment fragment : fragments) { + List unions = fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance); + for (PlanNode planNode : unions) { + UnionNode union = (UnionNode) planNode; + assertUnionIsInplace(union, fragment); + hasLocalShuffleUnion = true; + } + } + Assertions.assertTrue(hasLocalShuffleUnion); + } + + private void assertUnionIsInplace(UnionNode unionNode, PlanFragment unionFragment) { + Assertions.assertTrue(unionNode.isLocalShuffleUnion()); + for (PlanNode child : unionNode.getChildren()) { + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, exchangeNode.getPartitionType()); + for (PlanFragment childFragment : unionFragment.getChildren()) { + DataSink sink = childFragment.getSink(); + if (sink instanceof DataStreamSink && sink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) { + Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, sink.getOutputPartition().getType()); + } else if (sink instanceof MultiCastDataSink) { + for (DataStreamSink dataStreamSink : ((MultiCastDataSink) sink).getDataStreamSinks()) { + if (dataStreamSink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) { + Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, dataStreamSink.getOutputPartition().getType()); + } + } + } + } + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java index c23ff24c868d89..8a1e3da7018cbe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java @@ -128,8 +128,7 @@ public PlanChecker checkParse(String sql, Consumer consumer) { public AbstractInsertExecutor getInsertExecutor(String sql) throws Exception { StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql); LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql); - UUID uuid = UUID.randomUUID(); - connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + setQueryId(); InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) parsedPlan; LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(parsedPlan, statementContext); return insertIntoTableCommand.initPlan(connectContext, @@ -211,6 +210,7 @@ public List explainPlanProcess() { public List explainPlanProcess(String sql) { NereidsParser parser = new NereidsParser(); LogicalPlan command = parser.parseSingle(sql); + setQueryId(); NereidsPlanner planner = new NereidsPlanner( new StatementContext(connectContext, new OriginStatement(sql, 0))); planner.planWithLock(command, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN, true); @@ -395,6 +395,7 @@ public NereidsPlanner plan(String sql) { connectContext.setStatementContext(statementContext); NereidsPlanner planner = new NereidsPlanner(statementContext); LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql); + setQueryId(); LogicalPlanAdapter parsedPlanAdaptor = new LogicalPlanAdapter(parsedPlan, statementContext); statementContext.setParsedStatement(parsedPlanAdaptor); @@ -722,6 +723,7 @@ public PlanChecker checkExplain(String sql, Consumer consumer) { connectContext.setStatementContext(statementContext); LogicalPlan parsed = new NereidsParser().parseSingle(sql); + setQueryId(); NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext); LogicalPlanAdapter adapter = LogicalPlanAdapter.of(parsed); adapter.setIsExplain(new ExplainOptions(ExplainLevel.ALL_PLAN, false)); @@ -749,6 +751,7 @@ public PlanChecker checkPlannerResult(String sql, Consumer consu connectContext.setStatementContext(statementContext); LogicalPlan parsed = new NereidsParser().parseSingle(sql); + setQueryId(); NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext); SessionVariable sessionVariable = connectContext.getSessionVariable(); try { @@ -878,6 +881,12 @@ public PlanChecker printlnOrigin() { return this; } + private void setQueryId() { + UUID uuid = UUID.randomUUID(); + TUniqueId id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + connectContext.setQueryId(id); + } + public static boolean isPlanEqualWithoutID(Plan plan1, Plan plan2) { if (plan1.arity() != plan2.arity() || !plan1.getOutput().equals(plan2.getOutput()) || plan1.getClass() != plan2.getClass()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java index f499caa96f6aeb..54bdb3aebeb35a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java @@ -68,9 +68,11 @@ import org.apache.doris.qe.cache.SqlCache; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TUniqueId; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Range; import mockit.Expectations; @@ -90,6 +92,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.UUID; public class OlapQueryCacheTest { private static final Logger LOG = LogManager.getLogger(OlapQueryCacheTest.class); @@ -272,6 +275,24 @@ Env getCurrentEnv() { db.registerTable(view3); View view4 = createEventNestedView(); db.registerTable(view4); + + SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo(); + Backend be = new Backend(0, "127.0.0.1", 0); + be.setAlive(true); + ImmutableMap backends = ImmutableMap.of(0L, be); + new Expectations(clusterInfo) { + { + clusterInfo.getBackendsByCurrentCluster(); + minTimes = 0; + result = backends; + } + + { + clusterInfo.getAllBackendsByAllCluster(); + minTimes = 0; + result = backends; + } + }; } private OlapTable createOrderTable() { @@ -502,6 +523,8 @@ private StatementBase parseSqlByNereids(String sql) { LogicalPlan plan = new NereidsParser().parseSingle(sql); OriginStatement originStatement = new OriginStatement(sql, 0); StatementContext statementContext = new StatementContext(ctx, originStatement); + UUID uuid = UUID.randomUUID(); + ctx.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); ctx.setStatementContext(statementContext); NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext); LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan, statementContext); diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 86a2d9be555f07..69100109a1ba32 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -50,6 +50,9 @@ enum TPartitionType { // used for hive unparititoned table HIVE_TABLE_SINK_UNPARTITIONED = 8 + + // used for random local shuffle union, one source instance random send data to target instances in the same backend + RANDOM_LOCAL_SHUFFLE = 9 } enum TDistributionType {