Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ class TKqpRunner : public IKqpRunner {
.Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize")
.Add(CreateKqpQueryPhasesTransformer(), "QueryPhases")
.Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects")
.Add(CreateKqpSinkPrecomputeTransformer(OptimizeCtx), "KqpSinkPrecompute")
.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
.Build(false));

Expand Down Expand Up @@ -373,16 +374,6 @@ class TKqpRunner : public IKqpRunner {
//.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
.Build(false);

auto physicalKqpSinkPrecomputeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("KqpSinkPrecompute"), "LogKqpSinkPrecompute")
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(
CreateKqpSinkPrecomputeTransformer(OptimizeCtx),
"KqpSinkPrecompute")
.Build(false));

auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildTxs"), "LogPhysicalBuildTxs")
Expand Down Expand Up @@ -495,8 +486,6 @@ class TKqpRunner : public IKqpRunner {
Transformer = CreateCompositeGraphTransformer(
{
TTransformStage{ physicalOptimizeTransformer, "PhysicalOptimize", TIssuesIds::DEFAULT_ERROR },
LogStage("PhysicalBuildSinkPrecompute"),
TTransformStage{ physicalKqpSinkPrecomputeTransformer, "PhysicalBuildSinkPrecompute", TIssuesIds::DEFAULT_ERROR },
LogStage("PhysicalOptimize"),
TTransformStage{ physicalBuildTxsTransformer, "PhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR },
LogStage("PhysicalBuildTxs"),
Expand Down
112 changes: 71 additions & 41 deletions ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,40 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
return TStatus::Ok;
}

const auto stagesUsedForPrecomputesAndSinks = FindStagesUsedForPrecomputeAndSinks(outputExpr);
Y_UNUSED(stagesUsedForPrecomputesAndSinks);

TNodeOnNodeOwnedMap marked;
for (const auto& [_, exprNode] : stagesUsedForPrecomputesAndSinks) {
AFL_ENSURE(exprNode);
TExprBase node(exprNode);
const auto stage = node.Cast<TDqStage>();
if (HasNonDeterministicFunction(stage)) {
marked.emplace(node.Raw(), node.Ptr());

{
const auto [precomputeStages, sinkStages] = GatherPrecomputeAndSinkStages(outputExpr, *KqpCtx);

for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(precomputeStages, sinkStages)) {
AFL_ENSURE(exprNode);
TExprBase node(exprNode);
const auto stage = node.Cast<TDqStage>();
if (HasNonDeterministicFunction(stage) || !IsKqpPureInputs(stage.Inputs())) {
marked.emplace(node.Raw(), node.Ptr());
}
}

const auto resultStages = GatherResultStages(outputExpr, ctx);
if (!resultStages) {
return TStatus::Error;
}

for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(*resultStages, sinkStages)) {
AFL_ENSURE(exprNode);
TExprBase node(exprNode);
const auto stage = node.Cast<TDqStage>();
if (!IsKqpPureInputs(stage.Inputs())) {
marked.emplace(node.Raw(), node.Ptr());
}
}
}

if (marked.empty()) {
return TStatus::Ok;
}

// Find all stages that depend on non-deterministic stages
// that are used for sinks or precomputes.
// Find all stages that depend on marked stages
{
auto filter = [](const TExprNode::TPtr& exprNode) {
return !exprNode->IsLambda();
Expand Down Expand Up @@ -108,20 +123,16 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
|| (KqpCtx->Tables->ExistingTable(KqpCtx->Cluster, sinkSettings.Cast().Table().Path()).Metadata->Kind == EKikimrTableKind::Olap);
if (!executedAsSingleEffect) {
AFL_ENSURE(stage.Inputs().Size() == 1);
AFL_ENSURE(stage.Inputs().Item(0).Maybe<TDqCnUnionAll>());
AFL_ENSURE(stage.Settings().Empty());

auto channel = Build<TDqCnUnionAll>(ctx, node.Pos())
.Output()
.Stage<TDqStage>() // no output
.Inputs(stage.Inputs())
.Program(stage.Program())
.Settings(stage.Settings())
.Build()
.Index().Build("0")
.Build()
.Done();
AFL_ENSURE(stage.Program().Args().Size() == 1);
AFL_ENSURE(stage.Program().Body().Maybe<TCoArgument>());
AFL_ENSURE(stage.Program().Body().Cast<TCoArgument>().Name() ==
stage.Program().Args().Arg(0).Cast<TCoArgument>().Name());

auto inputRows = Build<TDqPhyPrecompute>(ctx, node.Pos())
.Connection(channel)
.Connection(stage.Inputs().Item(0).Ptr())
.Done();

auto rowArg = Build<TCoArgument>(ctx, node.Pos())
Expand Down Expand Up @@ -152,43 +163,43 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {

outputExpr = ctx.ReplaceNodes(std::move(outputExpr), replaces);

return TStatus::Ok;
return TStatus(TStatus::Repeat, true);
}

void Rewind() final {
}

private:
TNodeOnNodeOwnedMap FindStagesUsedForPrecomputeAndSinks(TExprNode::TPtr& expr) {
const auto [precomputeStages, sinkStages] = GatherPrecomputesAndSinks(expr, *KqpCtx);

TNodeSet stagesUsedForPrecomputes;
for (const auto& precomputeStage : precomputeStages) {
auto visit = [&stagesUsedForPrecomputes](const TDqStage& stage) mutable -> bool {
return stagesUsedForPrecomputes.emplace(stage.Raw()).second;
TNodeOnNodeOwnedMap FindStagesUsedForBothStagesSets(
const TNodeOnNodeOwnedMap& leftStages,
const TNodeOnNodeOwnedMap& rightStages) {
TNodeSet stagesUsedForLeft;
for (const auto& leftStage : leftStages) {
auto visit = [&stagesUsedForLeft](const TDqStage& stage) mutable -> bool {
return stagesUsedForLeft.emplace(stage.Raw()).second;
};
auto postVisit = [](const TDqStage&) {};
VisitStagesBackwards(precomputeStage.second, visit, postVisit);
VisitStagesBackwards(leftStage.second, visit, postVisit);
}

TNodeOnNodeOwnedMap stagesUsedForPrecomputesAndSinks;
TNodeSet stagesUsedForSinks;
for (const auto& sinkStage : sinkStages) {
auto visit = [&stagesUsedForSinks, &stagesUsedForPrecomputes, &stagesUsedForPrecomputesAndSinks](
TNodeOnNodeOwnedMap stagesUsedForLeftAndRight;
TNodeSet stagesUsedForRight;
for (const auto& rightStage : rightStages) {
auto visit = [&stagesUsedForRight, &stagesUsedForLeft, &stagesUsedForLeftAndRight](
const TDqStage& stage) mutable -> bool {
if (stagesUsedForPrecomputes.contains(stage.Raw())) {
stagesUsedForPrecomputesAndSinks.emplace(stage.Raw(), stage.Ptr());
if (stagesUsedForLeft.contains(stage.Raw())) {
stagesUsedForLeftAndRight.emplace(stage.Raw(), stage.Ptr());
}
return stagesUsedForSinks.emplace(stage.Raw()).second;
return stagesUsedForRight.emplace(stage.Raw()).second;
};
auto postVisit = [](const TDqStage&) {};
VisitStagesBackwards(sinkStage.second, visit, postVisit);
VisitStagesBackwards(rightStage.second, visit, postVisit);
}

return stagesUsedForPrecomputesAndSinks;
return stagesUsedForLeftAndRight;
}

std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputesAndSinks(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) {
std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeAndSinkStages(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) {
TNodeOnNodeOwnedMap precomputeStages;
TNodeOnNodeOwnedMap sinkStages;

Expand Down Expand Up @@ -253,6 +264,25 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
return {std::move(precomputeStages), std::move(sinkStages)};
}

std::optional<TNodeOnNodeOwnedMap> GatherResultStages(
const TExprNode::TPtr& inputExpr,
TExprContext& ctx) {
TNodeOnNodeOwnedMap resultStages;
TKqlQuery query(inputExpr);
for (const auto& result : query.Results()) {
if (auto maybeUnionAll = result.Value().Maybe<TDqCnUnionAll>()) {
auto resultConnection = maybeUnionAll.Cast();
auto resultStage = resultConnection.Output().Stage().Cast<TDqStage>();
resultStages.emplace(resultStage.Raw(), resultStage.Ptr());
} else if (!result.Value().Maybe<TDqCnValue>()) {
ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), TStringBuilder()
<< "Unexpected node in results: " << KqpExprToPrettyString(result.Value(), ctx)));
return std::nullopt;
}
}
return resultStages;
}

bool HasNonDeterministicFunction(const TDqStage& stage) {
bool hasNonDeterministicFunction = false;
VisitExpr(stage.Program().Ptr(), [&hasNonDeterministicFunction](const TExprNode::TPtr& exprNode) mutable {
Expand Down
Loading
Loading