Skip to content

Commit 1f3c5d9

Browse files
author
maxkovalev
committed
YQL-20445: Calculate lineage in tests
commit_hash:6bf28c5a731c7325efa6bbe915c4a4920673c844
1 parent 4f3cbc9 commit 1f3c5d9

File tree

8 files changed

+82
-12
lines changed

8 files changed

+82
-12
lines changed

yql/essentials/cfg/tests/gateways.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ YqlCore {
149149
Name: "EnableConstraintCheck"
150150
Args: ["Distinct","Unique"]
151151
}
152+
Flags {
153+
Name: "EnableLineage"
154+
}
152155
}
153156

154157
Dq {

yql/essentials/core/facade/yql_facade.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,7 +1402,7 @@ TProgram::TFutureStatus TProgram::RunAsync(
14021402
pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
14031403
pipeline.AddPostTypeAnnotation();
14041404
pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput");
1405-
pipeline.AddOptimization();
1405+
pipeline.AddOptimizationWithLineage();
14061406
if (EnableRangeComputeFor_) {
14071407
pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
14081408
"ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
@@ -1480,7 +1480,7 @@ TProgram::TFutureStatus TProgram::RunAsyncWithConfig(
14801480
pipeline.AddPostTypeAnnotation();
14811481
pipelineConf.AfterTypeAnnotation(&pipeline);
14821482

1483-
pipeline.AddOptimization();
1483+
pipeline.AddOptimizationWithLineage();
14841484
if (EnableRangeComputeFor_) {
14851485
pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
14861486
"ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
@@ -1835,6 +1835,14 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStrin
18351835
writer.OnEndMap();
18361836
}
18371837

1838+
if (TypeCtx_->EnableLineage) {
1839+
writer.OnKeyedItem("CalculateLineage");
1840+
writer.OnBeginMap();
1841+
writer.OnKeyedItem("Correct");
1842+
writer.OnInt64Scalar(TypeCtx_->CorrectLineage);
1843+
writer.OnEndMap();
1844+
}
1845+
18381846
// extra
18391847
for (const auto& [k, extraYson] : extraYsons) {
18401848
writer.OnKeyedItem(k);

yql/essentials/core/services/yql_lineage.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_lineage.h"
2+
#include <yql/essentials/core/yql_expr_type_annotation.h>
23
#include <yql/essentials/core/yql_type_annotation.h>
34
#include <yql/essentials/core/yql_expr_optimize.h>
45
#include <yql/essentials/core/yql_opt_utils.h>
@@ -12,10 +13,11 @@ namespace {
1213

1314
class TLineageScanner {
1415
public:
15-
TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx)
16+
TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx, bool standalone)
1617
: Root_(root)
1718
, Ctx_(ctx)
1819
, ExprCtx_(exprCtx)
20+
, Standalone_(standalone)
1921
{
2022
}
2123

@@ -279,9 +281,13 @@ class TLineageScanner {
279281

280282
void Warning(const TExprNode& node) {
281283
auto message = TStringBuilder() << node.Type() << " : " << node.Content() << " is not supported";
282-
auto issue = TIssue(ExprCtx_.GetPosition(node.Pos()), message);
283-
SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_LINEAGE_INTERNAL_ERROR, issue);
284-
ExprCtx_.AddWarning(issue);
284+
if (Standalone_) {
285+
auto issue = TIssue(ExprCtx_.GetPosition(node.Pos()), message);
286+
SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_LINEAGE_INTERNAL_ERROR, issue);
287+
ExprCtx_.AddWarning(issue);
288+
} else {
289+
throw yexception() << message;
290+
}
285291
}
286292

287293
void HandleExtractMembers(TLineage& lineage, const TExprNode& node) {
@@ -894,7 +900,8 @@ class TLineageScanner {
894900
}
895901

896902
void WriteLineage(NYson::TYsonWriter& writer, const TLineage& lineage) {
897-
if (!lineage.Fields.Defined()) {
903+
// TODO: remove Standalone_ after fixing all failed tests, see YQL-20445
904+
if (Standalone_ && !lineage.Fields.Defined()) {
898905
YQL_ENSURE(!GetEnv("YQL_DETERMINISTIC_MODE"), "Can't calculate lineage");
899906
writer.OnEntity();
900907
return;
@@ -949,12 +956,13 @@ class TLineageScanner {
949956
TNodeMap<ui32> WriteIds_;
950957
TNodeMap<TLineage> Lineages_;
951958
TNodeSet HasReads_;
959+
bool Standalone_;
952960
};
953961

954962
} // namespace
955963

956-
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx) {
957-
TLineageScanner scanner(root, ctx, exprCtx);
964+
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx, bool standalone) {
965+
TLineageScanner scanner(root, ctx, exprCtx, standalone);
958966
return scanner.Process();
959967
}
960968

yql/essentials/core/services/yql_lineage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ namespace NYql {
66
struct TTypeAnnotationContext;
77
struct TExprContext;
88

9-
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx);
9+
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx, bool standalone);
1010

1111
} // namespace NYql

yql/essentials/core/services/yql_transform_pipeline.cpp

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <yql/essentials/core/yql_opt_normalize_depends_on.h>
1616
#include <yql/essentials/core/yql_opt_proposed_by_data.h>
1717
#include <yql/essentials/core/yql_opt_rewrite_io.h>
18+
#include <yql/essentials/utils/log/log.h>
1819

1920
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2021

@@ -166,8 +167,42 @@ TTransformationPipeline& TTransformationPipeline::AddFinalCommonOptimization(EYq
166167
return *this;
167168
}
168169

169-
TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
170+
TTransformationPipeline& TTransformationPipeline::AddOptimizationWithLineage(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
170171
AddCommonOptimization(false, issueCode);
172+
Transformers_.push_back(TTransformStage(
173+
CreateChoiceGraphTransformer(
174+
[&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) {
175+
return typesCtx.EnableLineage;
176+
},
177+
TTransformStage(
178+
CreateFunctorTransformer(
179+
[typeCtx = TypeAnnotationContext_](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
180+
output = input;
181+
try {
182+
CalculateLineage(*input, *typeCtx, ctx, false);
183+
} catch (const std::exception& e) {
184+
YQL_LOG(ERROR) << "CalculateLineage error: " << e.what();
185+
typeCtx->CorrectLineage = false;
186+
}
187+
return IGraphTransformer::TStatus::Ok;
188+
}),
189+
"Lineage",
190+
issueCode),
191+
TTransformStage(
192+
new TNullTransformer(),
193+
"SkipLineage",
194+
issueCode)),
195+
"LineageCalculation",
196+
issueCode));
197+
AddProviderOptimization(issueCode);
198+
if (withFinalOptimization) {
199+
AddFinalCommonOptimization(issueCode);
200+
}
201+
AddCheckExecution(checkWorld, issueCode);
202+
return *this;
203+
}
204+
205+
TTransformationPipeline& TTransformationPipeline::AddProviderOptimization(EYqlIssueCode issueCode) {
171206
Transformers_.push_back(TTransformStage(
172207
CreateChoiceGraphTransformer(
173208
[&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) {
@@ -210,6 +245,12 @@ TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorl
210245
CreatePhysicalFinalizers(*TypeAnnotationContext_),
211246
"PhysicalFinalizers",
212247
issueCode));
248+
return *this;
249+
}
250+
251+
TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
252+
AddCommonOptimization(false, issueCode);
253+
AddProviderOptimization(issueCode);
213254
if (withFinalOptimization) {
214255
AddFinalCommonOptimization(issueCode);
215256
}
@@ -223,7 +264,7 @@ TTransformationPipeline& TTransformationPipeline::AddLineageOptimization(TMaybe<
223264
CreateFunctorTransformer(
224265
[typeCtx = TypeAnnotationContext_, &lineageOut](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
225266
output = input;
226-
lineageOut = CalculateLineage(*input, *typeCtx, ctx);
267+
lineageOut = CalculateLineage(*input, *typeCtx, ctx, true);
227268
return IGraphTransformer::TStatus::Ok;
228269
}),
229270
"LineageScanner",

yql/essentials/core/services/yql_transform_pipeline.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class TTransformationPipeline {
3535
TTransformationPipeline& AddCommonOptimization(bool forPeephole = false, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
3636
TTransformationPipeline& AddFinalCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
3737
TTransformationPipeline& AddOptimization(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
38+
TTransformationPipeline& AddProviderOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
39+
TTransformationPipeline& AddOptimizationWithLineage(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
3840
TTransformationPipeline& AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
3941
TTransformationPipeline& AddCheckExecution(bool checkWorld = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
4042
TTransformationPipeline& AddRun(TOperationProgressWriter writer, EYqlIssueCode issueCode = TIssuesIds::CORE_EXEC);

yql/essentials/core/yql_type_annotation.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,8 @@ struct TTypeAnnotationContext: public TThrRefBase {
472472
ui32 AndOverOrExpansionLimit = 100;
473473
bool EarlyExpandSeq = true;
474474
bool DirectRowDependsOn = true;
475+
bool EnableLineage = false;
476+
bool CorrectLineage = true;
475477

476478
TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const;
477479
IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx);

yql/essentials/providers/config/yql_config_provider.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,12 @@ class TConfigProvider: public TDataProviderBase {
10801080
return false;
10811081
}
10821082
Types_.DirectRowDependsOn = ("DirectRowDependsOn" == name);
1083+
} else if (name == "EnableLineage" || name == "DisableLineage") {
1084+
if (args.size() != 0) {
1085+
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected no arguments, but got " << args.size()));
1086+
return false;
1087+
}
1088+
Types_.EnableLineage = ("EnableLineage" == name);
10831089
} else {
10841090
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
10851091
return false;

0 commit comments

Comments
 (0)