Skip to content

Commit 70d247c

Browse files
committed
Apply ExtractMembers to PartitionsByKeys
commit_hash:479add2c66911eda0331429ee201911cc410bcbd
1 parent 71d1335 commit 70d247c

File tree

7 files changed

+72
-4
lines changed

7 files changed

+72
-4
lines changed

yql/essentials/core/common_opt/yql_co_extr_members.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,14 +505,15 @@ TExprNode::TPtr ApplyExtractMembersToFlatMap(const TExprNode::TPtr& node, const
505505
}
506506

507507
TExprNode::TPtr ApplyExtractMembersToPartitionByKey(const TExprNode::TPtr& node, const TExprNode::TPtr& members, TExprContext& ctx, TStringBuf logSuffix) {
508-
TCoPartitionByKey part(node);
508+
TCoPartitionByKeyBase part(node);
509509
YQL_CLOG(DEBUG, Core) << "Apply ExtractMembers to " << node->Content() << logSuffix;
510510
auto newBody = Build<TCoExtractMembers>(ctx, part.Pos())
511511
.Input(part.ListHandlerLambda().Body())
512512
.Members(members)
513513
.Done();
514514

515-
return Build<TCoPartitionByKey>(ctx, part.Pos())
515+
return Build<TCoPartitionByKeyBase>(ctx, part.Pos())
516+
.CallableName(node->Content())
516517
.Input(part.Input())
517518
.KeySelectorLambda(part.KeySelectorLambda())
518519
.ListHandlerLambda()

yql/essentials/core/common_opt/yql_co_finalizers.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,12 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) {
566566
return true;
567567
};
568568

569-
map[TCoPartitionByKey::CallableName()] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) {
569+
map[TCoPartitionByKey::CallableName()] = map[TCoPartitionsByKeys::CallableName()] =
570+
[](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx)
571+
{
572+
if (node->IsCallable(TCoPartitionsByKeys::CallableName()) && !CanApplyExtractMembersToPartitionsByKeys(optCtx.Types)) {
573+
return true;
574+
}
570575
OptimizeSubsetFieldsForNodeWithMultiUsage(node, *optCtx.ParentsMap, toOptimize, ctx,
571576
[] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) {
572577
return ApplyExtractMembersToPartitionByKey(input, members, ctx, " with multi-usage");

yql/essentials/core/common_opt/yql_co_flow2.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2461,7 +2461,11 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
24612461
return node;
24622462
}
24632463

2464-
if (self.Input().Maybe<TCoPartitionByKey>()) {
2464+
if (self.Input().Maybe<TCoPartitionByKeyBase>()) {
2465+
if (self.Input().Maybe<TCoPartitionsByKeys>() && !CanApplyExtractMembersToPartitionsByKeys(optCtx.Types)) {
2466+
return node;
2467+
}
2468+
24652469
if (auto res = ApplyExtractMembersToPartitionByKey(self.Input().Ptr(), self.Members().Ptr(), ctx, {})) {
24662470
return res;
24672471
}

yql/essentials/core/yql_opt_utils.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2841,4 +2841,10 @@ bool CanFuseLambdas(const TExprNode& outer, const TExprNode& inner, const TTypeA
28412841
}
28422842
}
28432843

2844+
bool CanApplyExtractMembersToPartitionsByKeys(const TTypeAnnotationContext* types) {
2845+
YQL_ENSURE(types);
2846+
static const char optName[] = "ExtractMembersForPartitionsByKeys";
2847+
return IsOptimizerEnabled<optName>(*types) && !IsOptimizerDisabled<optName>(*types);
2848+
}
2849+
28442850
}

yql/essentials/core/yql_opt_utils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,6 @@ bool IsNormalizedDependsOn(const TExprNode& node);
226226

227227
bool CanFuseLambdas(const TExprNode& outer, const TExprNode& inner, const TTypeAnnotationContext& types);
228228

229+
bool CanApplyExtractMembersToPartitionsByKeys(const TTypeAnnotationContext* types);
230+
229231
}

yql/essentials/tests/s-expressions/minirun/part5/canondata/result.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,20 @@
352352
"uri": "https://{canondata_backend}/1936842/0e8399ea57053215a346dc8ce52f4206386c19fb/resource.tar.gz#test.test_InMem-CompareStruct2-default.txt-Results_/results.txt"
353353
}
354354
],
355+
"test.test[InMem-ExtractMembersOverPartitionsByKyes-default.txt-Debug]": [
356+
{
357+
"checksum": "d2b4c78b235e2873373ae3d82987b842",
358+
"size": 1372,
359+
"uri": "https://{canondata_backend}/1600758/098090b2fdd96c8505b3c52b52768447f489d79e/resource.tar.gz#test.test_InMem-ExtractMembersOverPartitionsByKyes-default.txt-Debug_/opt.yql"
360+
}
361+
],
362+
"test.test[InMem-ExtractMembersOverPartitionsByKyes-default.txt-Results]": [
363+
{
364+
"checksum": "486117c929f3e77513590f8dec8cbb9c",
365+
"size": 837,
366+
"uri": "https://{canondata_backend}/1600758/098090b2fdd96c8505b3c52b52768447f489d79e/resource.tar.gz#test.test_InMem-ExtractMembersOverPartitionsByKyes-default.txt-Results_/results.txt"
367+
}
368+
],
355369
"test.test[InMem-Fold-default.txt-Debug]": [
356370
{
357371
"checksum": "3cf721a14fa47fd67c1967d58d82a7c2",
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
(
2+
#comment
3+
(let config (DataSource 'config))
4+
(let world (Configure! world config '"OptimizerFlags" '"ExtractMembersForPartitionsByKeys"))
5+
6+
(let res_sink (DataSink 'result))
7+
8+
(let list (AsList
9+
(AsStruct '('key (Uint32 '1)) '('subkey (Uint32 '2)) '('value (String 'a)) '('unused (Int32 '1)))
10+
(AsStruct '('key (Uint32 '1)) '('subkey (Uint32 '1)) '('value (String 'b)) '('unused (Int32 '1)))
11+
(AsStruct '('key (Uint32 '2)) '('subkey (Uint32 '4)) '('value (String 'c)) '('unused (Int32 '1)))
12+
(AsStruct '('key (Uint32 '1)) '('subkey (Uint32 '3)) '('value (String 'd)) '('unused (Int32 '1)))
13+
(AsStruct '('key (Uint32 '2)) '('subkey (Uint32 '6)) '('value (String 'e)) '('unused (Int32 '1)))
14+
(AsStruct '('key (Uint32 '3)) '('subkey (Uint32 '5)) '('value (String 'f)) '('unused (Int32 '1)))
15+
))
16+
17+
18+
(let keyExtractor (lambda '(x) (Member x 'key)))
19+
(let sortKeyExtractor (lambda '(x) (Member x 'subkey)))
20+
(let direction (Bool 'true))
21+
22+
(let handler (lambda '(x) (Condense1 x
23+
(lambda '(row) (AsStruct '('key (Member row 'key))
24+
'('group (Concat (String '"values:") (Member row 'value)))))
25+
(lambda '(row state) (AggrNotEquals (Member row 'key) (Member state 'key)))
26+
(lambda '(row state) (AsStruct '('key (Member state 'key)) '('group
27+
(Concat (Concat (Member state 'group) (String '" ")) (Member row 'value)))))
28+
)))
29+
(let data (PartitionsByKeys (Iterator list) keyExtractor direction sortKeyExtractor handler))
30+
(let data (Collect data))
31+
(let data (ExtractMembers data '('group)))
32+
33+
(let world (Write! world res_sink (Key) data '('('type))))
34+
(let world (Commit! world res_sink))
35+
(return world)
36+
)

0 commit comments

Comments
 (0)