Skip to content

Commit cab7c17

Browse files
authored
Fix missing precomputes with sink (#26578)
1 parent bb8971b commit cab7c17

File tree

4 files changed

+273
-55
lines changed

4 files changed

+273
-55
lines changed

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ class TKqpRunner : public IKqpRunner {
333333
.Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize")
334334
.Add(CreateKqpQueryPhasesTransformer(), "QueryPhases")
335335
.Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects")
336+
.Add(CreateKqpSinkPrecomputeTransformer(OptimizeCtx), "KqpSinkPrecompute")
336337
.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
337338
.Build(false));
338339

@@ -373,16 +374,6 @@ class TKqpRunner : public IKqpRunner {
373374
//.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
374375
.Build(false);
375376

376-
auto physicalKqpSinkPrecomputeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
377-
.AddServiceTransformers()
378-
.Add(Log("KqpSinkPrecompute"), "LogKqpSinkPrecompute")
379-
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
380-
.AddPostTypeAnnotation(/* forSubgraph */ true)
381-
.Add(
382-
CreateKqpSinkPrecomputeTransformer(OptimizeCtx),
383-
"KqpSinkPrecompute")
384-
.Build(false));
385-
386377
auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
387378
.AddServiceTransformers()
388379
.Add(Log("PhysicalBuildTxs"), "LogPhysicalBuildTxs")
@@ -495,8 +486,6 @@ class TKqpRunner : public IKqpRunner {
495486
Transformer = CreateCompositeGraphTransformer(
496487
{
497488
TTransformStage{ physicalOptimizeTransformer, "PhysicalOptimize", TIssuesIds::DEFAULT_ERROR },
498-
LogStage("PhysicalBuildSinkPrecompute"),
499-
TTransformStage{ physicalKqpSinkPrecomputeTransformer, "PhysicalBuildSinkPrecompute", TIssuesIds::DEFAULT_ERROR },
500489
LogStage("PhysicalOptimize"),
501490
TTransformStage{ physicalBuildTxsTransformer, "PhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR },
502491
LogStage("PhysicalBuildTxs"),

ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp

Lines changed: 71 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,40 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
3838
return TStatus::Ok;
3939
}
4040

41-
const auto stagesUsedForPrecomputesAndSinks = FindStagesUsedForPrecomputeAndSinks(outputExpr);
42-
Y_UNUSED(stagesUsedForPrecomputesAndSinks);
43-
4441
TNodeOnNodeOwnedMap marked;
45-
for (const auto& [_, exprNode] : stagesUsedForPrecomputesAndSinks) {
46-
AFL_ENSURE(exprNode);
47-
TExprBase node(exprNode);
48-
const auto stage = node.Cast<TDqStage>();
49-
if (HasNonDeterministicFunction(stage)) {
50-
marked.emplace(node.Raw(), node.Ptr());
42+
43+
{
44+
const auto [precomputeStages, sinkStages] = GatherPrecomputeAndSinkStages(outputExpr, *KqpCtx);
45+
46+
for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(precomputeStages, sinkStages)) {
47+
AFL_ENSURE(exprNode);
48+
TExprBase node(exprNode);
49+
const auto stage = node.Cast<TDqStage>();
50+
if (HasNonDeterministicFunction(stage) || !IsKqpPureInputs(stage.Inputs())) {
51+
marked.emplace(node.Raw(), node.Ptr());
52+
}
53+
}
54+
55+
const auto resultStages = GatherResultStages(outputExpr, ctx);
56+
if (!resultStages) {
57+
return TStatus::Error;
58+
}
59+
60+
for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(*resultStages, sinkStages)) {
61+
AFL_ENSURE(exprNode);
62+
TExprBase node(exprNode);
63+
const auto stage = node.Cast<TDqStage>();
64+
if (!IsKqpPureInputs(stage.Inputs())) {
65+
marked.emplace(node.Raw(), node.Ptr());
66+
}
5167
}
5268
}
5369

5470
if (marked.empty()) {
5571
return TStatus::Ok;
5672
}
5773

58-
// Find all stages that depend on non-deterministic stages
59-
// that are used for sinks or precomputes.
74+
// Find all stages that depend on marked stages
6075
{
6176
auto filter = [](const TExprNode::TPtr& exprNode) {
6277
return !exprNode->IsLambda();
@@ -108,20 +123,16 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
108123
|| (KqpCtx->Tables->ExistingTable(KqpCtx->Cluster, sinkSettings.Cast().Table().Path()).Metadata->Kind == EKikimrTableKind::Olap);
109124
if (!executedAsSingleEffect) {
110125
AFL_ENSURE(stage.Inputs().Size() == 1);
126+
AFL_ENSURE(stage.Inputs().Item(0).Maybe<TDqCnUnionAll>());
127+
AFL_ENSURE(stage.Settings().Empty());
111128

112-
auto channel = Build<TDqCnUnionAll>(ctx, node.Pos())
113-
.Output()
114-
.Stage<TDqStage>() // no output
115-
.Inputs(stage.Inputs())
116-
.Program(stage.Program())
117-
.Settings(stage.Settings())
118-
.Build()
119-
.Index().Build("0")
120-
.Build()
121-
.Done();
129+
AFL_ENSURE(stage.Program().Args().Size() == 1);
130+
AFL_ENSURE(stage.Program().Body().Maybe<TCoArgument>());
131+
AFL_ENSURE(stage.Program().Body().Cast<TCoArgument>().Name() ==
132+
stage.Program().Args().Arg(0).Cast<TCoArgument>().Name());
122133

123134
auto inputRows = Build<TDqPhyPrecompute>(ctx, node.Pos())
124-
.Connection(channel)
135+
.Connection(stage.Inputs().Item(0).Ptr())
125136
.Done();
126137

127138
auto rowArg = Build<TCoArgument>(ctx, node.Pos())
@@ -152,43 +163,43 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
152163

153164
outputExpr = ctx.ReplaceNodes(std::move(outputExpr), replaces);
154165

155-
return TStatus::Ok;
166+
return TStatus(TStatus::Repeat, true);
156167
}
157168

158169
void Rewind() final {
159170
}
160171

161172
private:
162-
TNodeOnNodeOwnedMap FindStagesUsedForPrecomputeAndSinks(TExprNode::TPtr& expr) {
163-
const auto [precomputeStages, sinkStages] = GatherPrecomputesAndSinks(expr, *KqpCtx);
164-
165-
TNodeSet stagesUsedForPrecomputes;
166-
for (const auto& precomputeStage : precomputeStages) {
167-
auto visit = [&stagesUsedForPrecomputes](const TDqStage& stage) mutable -> bool {
168-
return stagesUsedForPrecomputes.emplace(stage.Raw()).second;
173+
TNodeOnNodeOwnedMap FindStagesUsedForBothStagesSets(
174+
const TNodeOnNodeOwnedMap& leftStages,
175+
const TNodeOnNodeOwnedMap& rightStages) {
176+
TNodeSet stagesUsedForLeft;
177+
for (const auto& leftStage : leftStages) {
178+
auto visit = [&stagesUsedForLeft](const TDqStage& stage) mutable -> bool {
179+
return stagesUsedForLeft.emplace(stage.Raw()).second;
169180
};
170181
auto postVisit = [](const TDqStage&) {};
171-
VisitStagesBackwards(precomputeStage.second, visit, postVisit);
182+
VisitStagesBackwards(leftStage.second, visit, postVisit);
172183
}
173184

174-
TNodeOnNodeOwnedMap stagesUsedForPrecomputesAndSinks;
175-
TNodeSet stagesUsedForSinks;
176-
for (const auto& sinkStage : sinkStages) {
177-
auto visit = [&stagesUsedForSinks, &stagesUsedForPrecomputes, &stagesUsedForPrecomputesAndSinks](
185+
TNodeOnNodeOwnedMap stagesUsedForLeftAndRight;
186+
TNodeSet stagesUsedForRight;
187+
for (const auto& rightStage : rightStages) {
188+
auto visit = [&stagesUsedForRight, &stagesUsedForLeft, &stagesUsedForLeftAndRight](
178189
const TDqStage& stage) mutable -> bool {
179-
if (stagesUsedForPrecomputes.contains(stage.Raw())) {
180-
stagesUsedForPrecomputesAndSinks.emplace(stage.Raw(), stage.Ptr());
190+
if (stagesUsedForLeft.contains(stage.Raw())) {
191+
stagesUsedForLeftAndRight.emplace(stage.Raw(), stage.Ptr());
181192
}
182-
return stagesUsedForSinks.emplace(stage.Raw()).second;
193+
return stagesUsedForRight.emplace(stage.Raw()).second;
183194
};
184195
auto postVisit = [](const TDqStage&) {};
185-
VisitStagesBackwards(sinkStage.second, visit, postVisit);
196+
VisitStagesBackwards(rightStage.second, visit, postVisit);
186197
}
187198

188-
return stagesUsedForPrecomputesAndSinks;
199+
return stagesUsedForLeftAndRight;
189200
}
190201

191-
std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputesAndSinks(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) {
202+
std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeAndSinkStages(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) {
192203
TNodeOnNodeOwnedMap precomputeStages;
193204
TNodeOnNodeOwnedMap sinkStages;
194205

@@ -253,6 +264,25 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase {
253264
return {std::move(precomputeStages), std::move(sinkStages)};
254265
}
255266

267+
std::optional<TNodeOnNodeOwnedMap> GatherResultStages(
268+
const TExprNode::TPtr& inputExpr,
269+
TExprContext& ctx) {
270+
TNodeOnNodeOwnedMap resultStages;
271+
TKqlQuery query(inputExpr);
272+
for (const auto& result : query.Results()) {
273+
if (auto maybeUnionAll = result.Value().Maybe<TDqCnUnionAll>()) {
274+
auto resultConnection = maybeUnionAll.Cast();
275+
auto resultStage = resultConnection.Output().Stage().Cast<TDqStage>();
276+
resultStages.emplace(resultStage.Raw(), resultStage.Ptr());
277+
} else if (!result.Value().Maybe<TDqCnValue>()) {
278+
ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), TStringBuilder()
279+
<< "Unexpected node in results: " << KqpExprToPrettyString(result.Value(), ctx)));
280+
return std::nullopt;
281+
}
282+
}
283+
return resultStages;
284+
}
285+
256286
bool HasNonDeterministicFunction(const TDqStage& stage) {
257287
bool hasNonDeterministicFunction = false;
258288
VisitExpr(stage.Program().Ptr(), [&hasNonDeterministicFunction](const TExprNode::TPtr& exprNode) mutable {

0 commit comments

Comments
 (0)