Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
for (int i = 0; i < channels.size(); ++i) {
if (channels[i]->is_local()) {
local_size++;
local_channel_ids.emplace_back(i);
_last_local_channel_idx = i;
}
}
Expand Down Expand Up @@ -287,7 +288,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE);
#endif
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
Expand Down Expand Up @@ -494,6 +496,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
}
local_state.current_channel_idx = (local_state.current_channel_idx + 1) % _writer_count;
} else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) {
DCHECK_LT(local_state.current_channel_idx, local_state.local_channel_ids.size())
<< "local_state.current_channel_idx: " << local_state.current_channel_idx
<< ", local_channel_ids: " << to_string(local_state.local_channel_ids);

// 1. select channel
auto& current_channel =
local_state
.channels[local_state.local_channel_ids[local_state.current_channel_idx]];
DCHECK(current_channel->is_local())
<< "Only local channel are supported, current_channel_idx: "
<< local_state.local_channel_ids[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
auto status = current_channel->send_local_block(block, eos, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
}
local_state.current_channel_idx =
(local_state.current_channel_idx + 1) % local_state.local_channel_ids.size();
} else {
// Range partition
// 1. calculate range
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if _random == true
bool _only_local_exchange {false};
std::vector<uint32_t> local_channel_ids;

void on_channel_finished(InstanceLoId channel_id);
vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,10 @@ protected void doDistribute(boolean canUseNereidsDistributePlanner, ExplainLevel
}

boolean notNeedBackend = false;
// if the query can compute without backend, we can skip check cluster privileges
if (Config.isCloudMode()
&& cascadesContext.getConnectContext().supportHandleByFe()
&& physicalPlan instanceof ComputeResultSet) {
// the internal query not support process Resultset, so must process by backend
if (cascadesContext.getConnectContext().supportHandleByFe()
&& physicalPlan instanceof ComputeResultSet
&& !cascadesContext.getConnectContext().getState().isInternal()) {
Optional<ResultSet> resultSet = ((ComputeResultSet) physicalPlan).computeResultInFe(
cascadesContext, Optional.empty(), physicalPlan.getOutput());
if (resultSet.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import org.apache.doris.planner.BlackholeSink;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.EmptySetNode;
Expand Down Expand Up @@ -240,6 +241,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -2323,6 +2325,37 @@ && findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot()
setOperationNode.setColocate(true);
}

// whether accept LocalShuffleUnion.
// the backend need `enable_local_exchange=true` to compute whether a channel is `local`,
// and LocalShuffleUnion need `local` channels to do random local shuffle, so we need check
// `enable_local_exchange`
if (setOperation instanceof PhysicalUnion
&& context.getConnectContext().getSessionVariable().getEnableLocalExchange()
&& SessionVariable.canUseNereidsDistributePlanner()) {
boolean isLocalShuffleUnion = false;
if (setOperation.getPhysicalProperties().getDistributionSpec() instanceof DistributionSpecExecutionAny) {
Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new IdentityHashMap<>();
for (PlanNode child : setOperationNode.getChildren()) {
if (child instanceof ExchangeNode) {
exchangeIdToExchangeNode.put(child.getId().asInt(), (ExchangeNode) child);
}
}

for (PlanFragment childFragment : setOperationFragment.getChildren()) {
DataSink sink = childFragment.getSink();
if (sink instanceof DataStreamSink) {
isLocalShuffleUnion |= setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink);
} else if (sink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink;
for (DataStreamSink dataStreamSink : multiCastDataSink.getDataStreamSinks()) {
isLocalShuffleUnion |= setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink);
}
}
}
}
((UnionNode) setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion);
}

return setOperationFragment;
}

Expand Down Expand Up @@ -3215,4 +3248,18 @@ private boolean isSimpleQuery(PhysicalPlan root) {
}
return child instanceof PhysicalRelation;
}

private boolean setLocalRandomPartition(
Map<Integer, ExchangeNode> exchangeIdToExchangeNode, DataStreamSink dataStreamSink) {
ExchangeNode exchangeNode = exchangeIdToExchangeNode.get(
dataStreamSink.getExchNodeId().asInt());
if (exchangeNode == null) {
return false;
}
exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE);

DataPartition p2pPartition = new DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE);
dataStreamSink.setOutputPartition(p2pPartition);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.hint.JoinSkewInfo;
import org.apache.doris.nereids.load.NereidsDataDescription;
Expand Down Expand Up @@ -2069,8 +2070,10 @@ public List<Pair<LogicalPlan, StatementContext>> visitMultiStatements(MultiState
connectContext.setStatementContext(statementContext);
statementContext.setConnectContext(connectContext);
}
logicalPlans.add(Pair.of(
ParserUtils.withOrigin(ctx, () -> (LogicalPlan) visit(statement)), statementContext));
Pair<LogicalPlan, StatementContext> planAndContext = Pair.of(
ParserUtils.withOrigin(ctx, () -> (LogicalPlan) visit(statement)), statementContext);
statementContext.setParsedStatement(new LogicalPlanAdapter(planAndContext.first, statementContext));
logicalPlans.add(planAndContext);
List<Placeholder> params = new ArrayList<>(tokenPosToParameters.values());
statementContext.setPlaceholders(params);
tokenPosToParameters.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ private List<MaterializationContext> createAsyncMaterializationContext(CascadesC
return ImmutableList.of();
}
if (CollectionUtils.isEmpty(availableMTMVs)) {
LOG.info("Enable materialized view rewrite but availableMTMVs is empty, query id "
+ "is {}", cascadesContext.getConnectContext().getQueryIdentifier());
if (LOG.isDebugEnabled()) {
LOG.debug("Enable materialized view rewrite but availableMTMVs is empty, query id "
+ "is {}", cascadesContext.getConnectContext().getQueryIdentifier());
}
return ImmutableList.of();
}
List<MaterializationContext> asyncMaterializationContext = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.thrift.TExplainLevel;

import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -215,7 +216,10 @@ private UnassignedJob buildScanRemoteTableJob(
private UnassignedJob buildShuffleJob(
StatementContext statementContext, PlanFragment planFragment,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
if (planFragment.isPartitioned()) {
if (planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance)
.stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) {
return new UnassignedLocalShuffleUnionJob(statementContext, planFragment, inputJobs);
} else if (planFragment.isPartitioned()) {
return new UnassignedShuffleJob(statementContext, planFragment, inputJobs);
} else {
return new UnassignedGatherJob(statementContext, planFragment, inputJobs);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;

/**
* this class is used to local shuffle between the same backend, to save network io.
*
* for example: we have A/B/C three backend, and every backend process 3 instances before the Union,
* then the Union will generate same instances for the source backend, and every source
* instances will random local shuffle to the self backend's three target instances, like this:
*
* UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say there has 3 backends: A/B/C
* |
* +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local shuffle to A4/A5/A6,
* | B1 random local shuffle to B4/B5/B6,
* | C1 random local shuffle to C4/C5/C6
* |
* +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local shuffle to A4/A5/A6,
* | B2 random local shuffle to B4/B5/B6,
* | C2 random local shuffle to C4/C5/C6
* |
* +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local shuffle to A4/A5/A6,
* B3 random local shuffle to B4/B5/B6,
* C3 random local shuffle to C4/C5/C6
*/
public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob {

public UnassignedLocalShuffleUnionJob(StatementContext statementContext, PlanFragment fragment,
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob);
}

@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext context, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
ConnectContext connectContext = statementContext.getConnectContext();
DefaultScanSource noScanSource = DefaultScanSource.empty();
List<AssignedJob> unionInstances = Lists.newArrayListWithCapacity(inputJobs.size());

List<UnionNode> unionNodes = fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
Set<Integer> exchangeIdToUnion = Sets.newLinkedHashSet();
for (UnionNode unionNode : unionNodes) {
for (PlanNode child : unionNode.getChildren()) {
if (child instanceof ExchangeNode) {
exchangeIdToUnion.add(child.getId().asInt());
}
}
}

int id = 0;
for (Entry<ExchangeNode, Collection<AssignedJob>> exchangeNodeToSources : inputJobs.asMap().entrySet()) {
ExchangeNode exchangeNode = exchangeNodeToSources.getKey();
if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) {
continue;
}
for (AssignedJob inputInstance : exchangeNodeToSources.getValue()) {
StaticAssignedJob unionInstance = new StaticAssignedJob(
id++, connectContext.nextInstanceId(), this,
inputInstance.getAssignedWorker(), noScanSource
);
unionInstances.add(unionInstance);
}
}
return unionInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public DataPartition(TPartitionType type) {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
|| type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
|| type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED
|| type == TPartitionType.RANDOM_LOCAL_SHUFFLE);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode) {
offset = 0;
limit = -1;
this.conjuncts = Collections.emptyList();
children.add(inputNode);
this.children.add(inputNode);
TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
updateTupleIds(outputTupleDesc);
}
Expand Down Expand Up @@ -155,8 +155,15 @@ public void setRightChildOfBroadcastHashJoin(boolean value) {
*/
@Override
public boolean isSerialOperator() {
return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange()
|| partitionType == TPartitionType.UNPARTITIONED) && mergeInfo == null;
return (
(
ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().isUseSerialExchange()
&& (partitionType != TPartitionType.RANDOM_LOCAL_SHUFFLE)
)
|| partitionType == TPartitionType.UNPARTITIONED
)
&& mergeInfo == null;
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.doris.thrift.TPlanNodeType;

public class UnionNode extends SetOperationNode {
private boolean localShuffleUnion;

public UnionNode(PlanNodeId id, TupleId tupleId) {
super(id, tupleId, "UNION");
}
Expand All @@ -40,4 +42,12 @@ protected void toThrift(TPlanNode msg) {
public boolean isSerialOperator() {
return children.isEmpty();
}

public boolean isLocalShuffleUnion() {
return localShuffleUnion;
}

public void setLocalShuffleUnion(boolean localShuffleUnion) {
this.localShuffleUnion = localShuffleUnion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,11 @@ public TUniqueId getLastQueryId() {
public TUniqueId nextInstanceId() {
if (loadId != null) {
return new TUniqueId(loadId.hi, loadId.lo + instanceIdGenerator.incrementAndGet());
} else {
} else if (queryId != null) {
return new TUniqueId(queryId.hi, queryId.lo + instanceIdGenerator.incrementAndGet());
} else {
// for test
return new TUniqueId(0, instanceIdGenerator.incrementAndGet());
}
}

Expand Down
Loading
Loading