From c67623ff0ac782190675dac27b7499b1ad09ccb1 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 8 Oct 2025 18:59:24 +0300 Subject: [PATCH] fix --- ydb/core/kqp/host/kqp_runner.cpp | 13 +- ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp | 112 +++++++---- ydb/core/kqp/ut/effects/kqp_effects_ut.cpp | 199 +++++++++++++++++++ 3 files changed, 271 insertions(+), 53 deletions(-) diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 358fe6ab382e..f089e7a9d948 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -333,6 +333,7 @@ class TKqpRunner : public IKqpRunner { .Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize") .Add(CreateKqpQueryPhasesTransformer(), "QueryPhases") .Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects") + .Add(CreateKqpSinkPrecomputeTransformer(OptimizeCtx), "KqpSinkPrecompute") .Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery") .Build(false)); @@ -373,16 +374,6 @@ class TKqpRunner : public IKqpRunner { //.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery") .Build(false); - auto physicalKqpSinkPrecomputeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) - .AddServiceTransformers() - .Add(Log("KqpSinkPrecompute"), "LogKqpSinkPrecompute") - .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) - .AddPostTypeAnnotation(/* forSubgraph */ true) - .Add( - CreateKqpSinkPrecomputeTransformer(OptimizeCtx), - "KqpSinkPrecompute") - .Build(false)); - auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(Log("PhysicalBuildTxs"), "LogPhysicalBuildTxs") @@ -495,8 +486,6 @@ class TKqpRunner : public IKqpRunner { Transformer = CreateCompositeGraphTransformer( { TTransformStage{ physicalOptimizeTransformer, "PhysicalOptimize", TIssuesIds::DEFAULT_ERROR }, - LogStage("PhysicalBuildSinkPrecompute"), - TTransformStage{ physicalKqpSinkPrecomputeTransformer, "PhysicalBuildSinkPrecompute", TIssuesIds::DEFAULT_ERROR }, LogStage("PhysicalOptimize"), TTransformStage{ physicalBuildTxsTransformer, "PhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR }, LogStage("PhysicalBuildTxs"), diff --git a/ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp b/ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp index 674942d149e3..2589fb5edab1 100644 --- a/ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp +++ b/ydb/core/kqp/opt/kqp_opt_sink_precompute.cpp @@ -38,16 +38,32 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase { return TStatus::Ok; } - const auto stagesUsedForPrecomputesAndSinks = FindStagesUsedForPrecomputeAndSinks(outputExpr); - Y_UNUSED(stagesUsedForPrecomputesAndSinks); - TNodeOnNodeOwnedMap marked; - for (const auto& [_, exprNode] : stagesUsedForPrecomputesAndSinks) { - AFL_ENSURE(exprNode); - TExprBase node(exprNode); - const auto stage = node.Cast(); - if (HasNonDeterministicFunction(stage)) { - marked.emplace(node.Raw(), node.Ptr()); + + { + const auto [precomputeStages, sinkStages] = GatherPrecomputeAndSinkStages(outputExpr, *KqpCtx); + + for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(precomputeStages, sinkStages)) { + AFL_ENSURE(exprNode); + TExprBase node(exprNode); + const auto stage = node.Cast(); + if (HasNonDeterministicFunction(stage) || !IsKqpPureInputs(stage.Inputs())) { + marked.emplace(node.Raw(), node.Ptr()); + } + } + + const auto resultStages = GatherResultStages(outputExpr, ctx); + if (!resultStages) { + return TStatus::Error; + } + + for (const auto& [_, exprNode] : FindStagesUsedForBothStagesSets(*resultStages, sinkStages)) { + AFL_ENSURE(exprNode); + TExprBase node(exprNode); + const auto stage = node.Cast(); + if (!IsKqpPureInputs(stage.Inputs())) { + marked.emplace(node.Raw(), node.Ptr()); + } } } @@ -55,8 +71,7 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase { return TStatus::Ok; } - // Find all stages that depend on non-deterministic stages - // that are used for sinks or precomputes. + // Find all stages that depend on marked stages { auto filter = [](const TExprNode::TPtr& exprNode) { return !exprNode->IsLambda(); @@ -108,20 +123,16 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase { || (KqpCtx->Tables->ExistingTable(KqpCtx->Cluster, sinkSettings.Cast().Table().Path()).Metadata->Kind == EKikimrTableKind::Olap); if (!executedAsSingleEffect) { AFL_ENSURE(stage.Inputs().Size() == 1); + AFL_ENSURE(stage.Inputs().Item(0).Maybe()); + AFL_ENSURE(stage.Settings().Empty()); - auto channel = Build(ctx, node.Pos()) - .Output() - .Stage() // no output - .Inputs(stage.Inputs()) - .Program(stage.Program()) - .Settings(stage.Settings()) - .Build() - .Index().Build("0") - .Build() - .Done(); + AFL_ENSURE(stage.Program().Args().Size() == 1); + AFL_ENSURE(stage.Program().Body().Maybe()); + AFL_ENSURE(stage.Program().Body().Cast().Name() == + stage.Program().Args().Arg(0).Cast().Name()); auto inputRows = Build(ctx, node.Pos()) - .Connection(channel) + .Connection(stage.Inputs().Item(0).Ptr()) .Done(); auto rowArg = Build(ctx, node.Pos()) @@ -152,43 +163,43 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase { outputExpr = ctx.ReplaceNodes(std::move(outputExpr), replaces); - return TStatus::Ok; + return TStatus(TStatus::Repeat, true); } void Rewind() final { } private: - TNodeOnNodeOwnedMap FindStagesUsedForPrecomputeAndSinks(TExprNode::TPtr& expr) { - const auto [precomputeStages, sinkStages] = GatherPrecomputesAndSinks(expr, *KqpCtx); - - TNodeSet stagesUsedForPrecomputes; - for (const auto& precomputeStage : precomputeStages) { - auto visit = [&stagesUsedForPrecomputes](const TDqStage& stage) mutable -> bool { - return stagesUsedForPrecomputes.emplace(stage.Raw()).second; + TNodeOnNodeOwnedMap FindStagesUsedForBothStagesSets( + const TNodeOnNodeOwnedMap& leftStages, + const TNodeOnNodeOwnedMap& rightStages) { + TNodeSet stagesUsedForLeft; + for (const auto& leftStage : leftStages) { + auto visit = [&stagesUsedForLeft](const TDqStage& stage) mutable -> bool { + return stagesUsedForLeft.emplace(stage.Raw()).second; }; auto postVisit = [](const TDqStage&) {}; - VisitStagesBackwards(precomputeStage.second, visit, postVisit); + VisitStagesBackwards(leftStage.second, visit, postVisit); } - TNodeOnNodeOwnedMap stagesUsedForPrecomputesAndSinks; - TNodeSet stagesUsedForSinks; - for (const auto& sinkStage : sinkStages) { - auto visit = [&stagesUsedForSinks, &stagesUsedForPrecomputes, &stagesUsedForPrecomputesAndSinks]( + TNodeOnNodeOwnedMap stagesUsedForLeftAndRight; + TNodeSet stagesUsedForRight; + for (const auto& rightStage : rightStages) { + auto visit = [&stagesUsedForRight, &stagesUsedForLeft, &stagesUsedForLeftAndRight]( const TDqStage& stage) mutable -> bool { - if (stagesUsedForPrecomputes.contains(stage.Raw())) { - stagesUsedForPrecomputesAndSinks.emplace(stage.Raw(), stage.Ptr()); + if (stagesUsedForLeft.contains(stage.Raw())) { + stagesUsedForLeftAndRight.emplace(stage.Raw(), stage.Ptr()); } - return stagesUsedForSinks.emplace(stage.Raw()).second; + return stagesUsedForRight.emplace(stage.Raw()).second; }; auto postVisit = [](const TDqStage&) {}; - VisitStagesBackwards(sinkStage.second, visit, postVisit); + VisitStagesBackwards(rightStage.second, visit, postVisit); } - return stagesUsedForPrecomputesAndSinks; + return stagesUsedForLeftAndRight; } - std::pair GatherPrecomputesAndSinks(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) { + std::pair GatherPrecomputeAndSinkStages(const TExprNode::TPtr& query, const TKqpOptimizeContext& kqpCtx) { TNodeOnNodeOwnedMap precomputeStages; TNodeOnNodeOwnedMap sinkStages; @@ -253,6 +264,25 @@ class TKqpSinkPrecomputeTransformer : public TSyncTransformerBase { return {std::move(precomputeStages), std::move(sinkStages)}; } + std::optional GatherResultStages( + const TExprNode::TPtr& inputExpr, + TExprContext& ctx) { + TNodeOnNodeOwnedMap resultStages; + TKqlQuery query(inputExpr); + for (const auto& result : query.Results()) { + if (auto maybeUnionAll = result.Value().Maybe()) { + auto resultConnection = maybeUnionAll.Cast(); + auto resultStage = resultConnection.Output().Stage().Cast(); + resultStages.emplace(resultStage.Raw(), resultStage.Ptr()); + } else if (!result.Value().Maybe()) { + ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), TStringBuilder() + << "Unexpected node in results: " << KqpExprToPrettyString(result.Value(), ctx))); + return std::nullopt; + } + } + return resultStages; + } + bool HasNonDeterministicFunction(const TDqStage& stage) { bool hasNonDeterministicFunction = false; VisitExpr(stage.Program().Ptr(), [&hasNonDeterministicFunction](const TExprNode::TPtr& exprNode) mutable { diff --git a/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp index f211742d0f59..911d0469b156 100644 --- a/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp @@ -1002,6 +1002,205 @@ Y_UNIT_TEST_SUITE(KqpEffects) { CompareYson(FormatResultSetYson(result.GetResultSet(1)), UseSecondaryIndex ? R"([[0u]])" : R"([[1u]])"); } } + + Y_UNIT_TEST_TWIN(EffectWithSelect, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto kikimr = DefaultKikimrRunner({}, appConfig); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + auto client = kikimr.GetQueryClient(); + + { + const TString query = Sprintf(R"( + CREATE TABLE `/Root/Rows` ( + k1 Uint64, + k2 Uint64, + v1 Uint64, + v2 Uint64, + PRIMARY KEY (k1, k2) + ); + )"); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = R"( + INSERT INTO `/Root/Rows` (k1, k2, v1, v2) VALUES + (1u, 1u, 1u, 1u), + (1u, 2u, 2u, 2u), + (1u, 3u, 3u, 3u); + )"; + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + --!syntax_v1 + + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + + DECLARE $k AS Uint64; + + DECLARE $rows AS List< + Struct< + k2: Uint64, + v1: Uint64, + v2: Uint64 + > + >; + + $k2_by_k1 = ( + SELECT k2 FROM `/Root/Rows` + WHERE k1 = $k + ); + + $new_rows = ( + SELECT * FROM AS_TABLE($rows) + WHERE k2 NOT IN COMPACT $k2_by_k1 + ); + + SELECT COUNT(*) FROM $new_rows; + + SELECT * FROM $new_rows LIMIT 1000; + + UPSERT INTO `/Root/Rows` + SELECT $k AS k1, k2, v1, v2 FROM $new_rows; + )"; + + { + auto result = session.ExplainDataQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + + { + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .ExecMode(NYdb::NQuery::EExecMode::Explain); + auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(*result.GetStats()->GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + } + + { + const TString query = R"( + --!syntax_v1 + + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + + DECLARE $k AS Uint64; + + DECLARE $rows AS List< + Struct< + k2: Uint64, + v1: Uint64, + v2: Uint64 + > + >; + + $k2_by_k1 = ( + SELECT k2 FROM `/Root/Rows` + WHERE k1 = $k + ); + + $new_rows = ( + SELECT * FROM AS_TABLE($rows) + WHERE k2 NOT IN COMPACT $k2_by_k1 + ); + + SELECT COUNT(*) FROM $new_rows; + + SELECT * FROM $new_rows LIMIT 1000; + + $other_transformation = ( + SELECT $k * 10 AS k1, k2, v1 + v2 AS v1, v1 * v2 AS v2 FROM $new_rows + ); + + UPSERT INTO `/Root/Rows` + SELECT * FROM $other_transformation; + )"; + + { + auto result = session.ExplainDataQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + + { + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .ExecMode(NYdb::NQuery::EExecMode::Explain); + auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(*result.GetStats()->GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + } + + { + const TString query = R"( + --!syntax_v1 + + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + + DECLARE $k AS Uint64; + + $deleted_rows = ( + SELECT k1, k2 FROM `/Root/Rows` + WHERE k1 = $k + ); + + SELECT COUNT(*) FROM $deleted_rows; + + DELETE FROM `/Root/Rows` ON + SELECT * FROM $deleted_rows; + )"; + + { + auto result = session.ExplainDataQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + + { + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .ExecMode(NYdb::NQuery::EExecMode::Explain); + auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + + NJson::TJsonValue plan; + NJson::ReadJsonTree(*result.GetStats()->GetPlan(), &plan, true); + UNIT_ASSERT_C(plan["tables"].GetArray().size() == 1, plan["tables"].GetArray().size()); + UNIT_ASSERT_C(plan["tables"][0]["reads"].GetArray().size() == 1, plan["tables"][0]["reads"].GetArray().size()); + } + } + } } } // namespace NKqp