Skip to content

Commit bf6fab9

Browse files
committed
Finish parseJoin
1 parent cc00f81 commit bf6fab9

File tree

9 files changed

+141
-78
lines changed

9 files changed

+141
-78
lines changed

cmd/replay/frame_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestFrame(t *testing.T) {
1414
testFile := `"query_id","create_time","wallTimeMillis","output_rows","written_output_rows","catalog","schema","session_properties","query"
15-
"20240415_112042_61088_qa5fd","2024-04-15 11:20:42.755 UTC","99993","14","0","glue","ng_public","{query_max_scan_raw_input_bytes=500GB, iceberg.hive_statistics_merge_strategy=NUMBER_OF_DISTINCT_VALUES,TOTAL_SIZE_IN_BYTES, pushdown_subfields_enabled=true}","-- Looker Query Context '{""user_id"":2337,""history_slug"":""1d0df5dc263357e96a96310626454f6e"",""instance_slug"":""c59fc17fc46e0aeaf86d35ed33635ddc""}' SELECT (DATE_FORMAT(partner_data.created_date_local , '%Y-%m-%d')) AS ""partner_data.dynamic_period"", (MOD((DAY_OF_WEEK(partner_data.created_date_local ) % 7) - 1 + 7, 7)) AS ""partner_data.period_day_of_week_index"", (DATE_FORMAT(partner_data.created_date_local ,'%W')) AS ""partner_data.period_day_of_week"", COALESCE(SUM(partner_data.finished_rides ), 0) AS ""partner_data.finished_orders"" FROM ng_public.etl_partner_data AS partner_data INNER JOIN admin_system_city AS admin_system_city ON admin_system_city.id = partner_data.city_id INNER JOIN looker_scratch.LR_SHOSW1713178850318_admin_system_country AS admin_system_country ON admin_system_country.code = admin_system_city.country_code WHERE ((( partner_data.created_date_local ) >= ((DATE_ADD('day', -14, CAST(CAST(DATE_TRUNC('DAY', NOW()) AS DATE) AS TIMESTAMP)))) AND ( partner_data.created_date_local ) < ((DATE_ADD('day', 14, DATE_ADD('day', -14, CAST(CAST(DATE_TRUNC('DAY', NOW()) AS DATE) AS TIMESTAMP))))))) AND (admin_system_city.name ) = 'Budapest' AND (admin_system_country.name ) = 'Hungary' GROUP BY 1, 2, 3 ORDER BY 1 DESC LIMIT 500"`
15+
"20240415_112042_61088_qa5fd","2024-04-15 11:20:42.755 UTC","99993","14","0","glue","ng_public","{query_max_scan_raw_input_bytes=500GB, iceberg.hive_statistics_merge_strategy=NUMBER_OF_DISTINCT_VALUES,TOTAL_SIZE_IN_BYTES, pushdown_subfields_enabled=true}","-- Looker Query Context '{""user_id"":2337,""history_slug"":""1d0df5dc263357e96a96310626454f6e"",""instance_slug"":""c59fc17fc46e0aeaf86d35ed33635ddc""}' SELECT (DATE_FORMAT(partner_data.created_date_local , '%Y-%m-%d')) AS ""partner_data.dynamic_period"", (MOD((DAY_OF_WEEK(partner_data.created_date_local ) % 7) - 1 + 7, 7)) AS ""partner_data.period_day_of_week_index"", (DATE_FORMAT(partner_data.created_date_local ,'%W')) AS ""partner_data.period_day_of_week"", COALESCE(SUM(partner_data.finished_rides ), 0) AS ""partner_data.finished_orders"" FROM ng_public.etl_partner_data AS partner_data INNER JOIN admin_system_city AS admin_system_city ON admin_system_city.id = partner_data.city_id INNER JOIN lks.LR_SHOSW1713178850318_admin_system_country AS admin_system_country ON admin_system_country.code = admin_system_city.country_code WHERE ((( partner_data.created_date_local ) >= ((DATE_ADD('day', -14, CAST(CAST(DATE_TRUNC('DAY', NOW()) AS DATE) AS TIMESTAMP)))) AND ( partner_data.created_date_local ) < ((DATE_ADD('day', 14, DATE_ADD('day', -14, CAST(CAST(DATE_TRUNC('DAY', NOW()) AS DATE) AS TIMESTAMP))))))) AND (admin_system_city.name ) = 'Budapest' AND (admin_system_country.name ) = 'Hungary' GROUP BY 1, 2, 3 ORDER BY 1 DESC LIMIT 500"`
1616
stringReader := strings.NewReader(testFile)
1717
csvReader := csv.NewReader(stringReader)
1818
_, _ = csvReader.Read()

presto/plan_node/details.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ type PlanNodeDetails struct {
173173
}
174174

175175
type MathExpr struct {
176-
LeftOp Value `parser:"'(' @@ ')'"`
177-
Op string `parser:"@('+' | '-' | '*' | '/' | '%')"`
178-
RightOp Value `parser:"'(' @@ ')'"`
176+
Left Value `parser:"'(' @@ ')'"`
177+
Op string `parser:"@('+' | '-' | '*' | '/' | '%')"`
178+
Right Value `parser:"'(' @@ ')'"`
179179
}
180180

181181
func (m *MathExpr) value() {}
182182
func (m *MathExpr) String() string {
183-
return fmt.Sprintf("(%s) %s (%s)", m.LeftOp.String(), m.Op, m.RightOp.String())
183+
return fmt.Sprintf("(%s) %s (%s)", m.Left.String(), m.Op, m.Right.String())
184184
}

presto/plan_node/details_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,22 +263,22 @@ func TestParseAssignment(t *testing.T) {
263263
{ // case 6
264264
Identifier: plan_node.IdentRef{Ident: "expr_5"},
265265
AssignedValue: &plan_node.MathExpr{
266-
LeftOp: &plan_node.MathExpr{
267-
LeftOp: &plan_node.IdentRef{Ident: "b"},
268-
Op: "+",
269-
RightOp: &plan_node.TypedValue{
266+
Left: &plan_node.MathExpr{
267+
Left: &plan_node.IdentRef{Ident: "b"},
268+
Op: "+",
269+
Right: &plan_node.TypedValue{
270270
DataType: "INTEGER",
271271
ValueLiteral: "1",
272272
},
273273
},
274274
Op: "-",
275-
RightOp: &plan_node.MathExpr{
276-
LeftOp: &plan_node.TypedValue{
275+
Right: &plan_node.MathExpr{
276+
Left: &plan_node.TypedValue{
277277
DataType: "INTEGER",
278278
ValueLiteral: "2",
279279
},
280280
Op: "*",
281-
RightOp: &plan_node.FunctionCall{
281+
Right: &plan_node.FunctionCall{
282282
FunctionName: "abs",
283283
Parameters: []plan_node.Value{
284284
&plan_node.IdentRef{Ident: "c"},

presto/plan_node/identifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ func ParseHiveTableHandle(literal string) *HiveTableHandle {
5050
}
5151

5252
type JoinPredicate struct {
53-
LeftColumn string `parser:"'(' @String"`
54-
RightColumn string `parser:"'=' @String ')'"`
53+
Left string `parser:"'(' @String"`
54+
Right string `parser:"'=' @String ')'"`
5555
}
5656

5757
type JoinPredicates struct {

presto/plan_node/identifier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ func TestParseJoinPredicates(t *testing.T) {
3333
[]plan_node.JoinPredicates{
3434
{Predicates: []plan_node.JoinPredicate{
3535
{
36-
LeftColumn: "ws_item_sk_252",
37-
RightColumn: "wr_item_sk",
36+
Left: "ws_item_sk_252",
37+
Right: "wr_item_sk",
3838
},
3939
{
40-
LeftColumn: "ws_order_number_266",
41-
RightColumn: "wr_order_number",
40+
Left: "ws_order_number_266",
41+
Right: "wr_order_number",
4242
},
4343
}},
4444
})

presto/plan_node/plan_node.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,75 @@ func (t PlanTree) Traverse(ctx context.Context, fn PlanNodeTraverseFunction, mod
109109
return NoRootPlanNodeError
110110
}
111111

112+
func traceValue(assignmentMap map[string]Value, tableHandle *HiveTableHandle, value Value) Value {
113+
switch typed := value.(type) {
114+
case *HiveColumnHandle:
115+
if typed.Table == nil {
116+
typed.Table = tableHandle
117+
}
118+
case *TypeCastedValue:
119+
typed.OriginalValue = traceValue(assignmentMap, tableHandle, typed.OriginalValue)
120+
case *IdentRef:
121+
if assignedValue, exists := assignmentMap[typed.Ident]; exists {
122+
return traceValue(assignmentMap, tableHandle, assignedValue)
123+
} else {
124+
return value
125+
}
126+
case *FunctionCall:
127+
for i := 0; i < len(typed.Parameters); i++ {
128+
typed.Parameters[i] = traceValue(assignmentMap, tableHandle, typed.Parameters[i])
129+
}
130+
case *MathExpr:
131+
typed.Left = traceValue(assignmentMap, tableHandle, typed.Left)
132+
typed.Right = traceValue(assignmentMap, tableHandle, typed.Right)
133+
}
134+
return value
135+
}
136+
137+
func (t PlanTree) ParseJoins() ([]Join, error) {
138+
assignmentMap := make(map[string]Value)
139+
joins := make([]Join, 0)
140+
if err := t.Traverse(context.Background(), func(ctx context.Context, node *PlanNode) error {
141+
tableHandle := ParseHiveTableHandle(node.Identifier)
142+
id := fmt.Sprintf("%s_%s", node.Id, node.Name)
143+
if node.Details != "" {
144+
ast, parseErr := PlanNodeDetailParser.ParseString(id, node.Details)
145+
if parseErr != nil {
146+
return parseErr
147+
}
148+
// Must scan backwards.
149+
for i := len(ast.Stmts) - 1; i >= 0; i-- {
150+
if assignment, ok := ast.Stmts[i].(*Assignment); ok {
151+
assignmentMap[assignment.Identifier.Ident] = traceValue(assignmentMap, tableHandle, assignment.AssignedValue)
152+
}
153+
}
154+
}
155+
if IsJoin[node.Name] {
156+
preds, parseErr := PlanNodeJoinPredicatesParser.ParseString(id, node.Identifier)
157+
if parseErr != nil {
158+
return parseErr
159+
}
160+
for _, pred := range preds.Predicates {
161+
joins = append(joins, Join{
162+
JoinType: node.Name,
163+
LeftValue: assignmentMap[pred.Left],
164+
RightValue: assignmentMap[pred.Right],
165+
})
166+
}
167+
}
168+
return nil
169+
}, PlanTreeDFSTraverse); err != nil {
170+
return nil, err
171+
}
172+
return joins, nil
173+
}
174+
175+
type Join struct {
176+
JoinType string
177+
LeftValue Value
178+
RightValue Value
179+
}
180+
112181
type PlanEstimate struct {
113182
OutputRowCount JsonFloat64 `json:"outputRowCount"`
114183
TotalSize JsonFloat64 `json:"totalSize"`

presto/plan_node/plan_node_test.go

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,37 @@
11
package plan_node_test
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
"github.com/stretchr/testify/assert"
87
"math"
98
"os"
109
"pbench/presto/plan_node"
11-
"strings"
1210
"testing"
1311
)
1412

15-
func traceValue(assignmentMap map[string]plan_node.Value, tableHandle *plan_node.HiveTableHandle, value plan_node.Value) plan_node.Value {
16-
switch typed := value.(type) {
17-
case *plan_node.HiveColumnHandle:
18-
if typed.Table == nil {
19-
typed.Table = tableHandle
13+
func testParseJoin(t *testing.T, fileName, expected string) {
14+
t.Helper()
15+
t.Run(fileName, func(t *testing.T) {
16+
bytes, err := os.ReadFile(fileName)
17+
if !assert.Nil(t, err) {
18+
t.FailNow()
2019
}
21-
case *plan_node.TypeCastedValue:
22-
typed.OriginalValue = traceValue(assignmentMap, tableHandle, typed.OriginalValue)
23-
case *plan_node.IdentRef:
24-
return traceValue(assignmentMap, tableHandle, assignmentMap[typed.Ident])
25-
case *plan_node.FunctionCall:
26-
for i := 0; i < len(typed.Parameters); i++ {
27-
typed.Parameters[i] = traceValue(assignmentMap, tableHandle, typed.Parameters[i])
20+
planTree := make(plan_node.PlanTree)
21+
if !assert.Nil(t, json.Unmarshal(bytes, &planTree)) {
22+
t.FailNow()
2823
}
29-
}
30-
return value
24+
joins, parseErr := planTree.ParseJoins()
25+
if assert.Nil(t, parseErr) {
26+
assert.Equal(t, expected, fmt.Sprint(joins))
27+
}
28+
})
3129
}
3230

33-
func TestPlanTree(t *testing.T) {
34-
bytes, err := os.ReadFile("sample_plan.json")
35-
if !assert.Nil(t, err) {
36-
t.FailNow()
37-
}
38-
planTree := make(plan_node.PlanTree)
39-
if !assert.Nil(t, json.Unmarshal(bytes, &planTree)) {
40-
t.FailNow()
41-
}
42-
assignmentMap := make(map[string]plan_node.Value)
43-
assert.Nil(t, planTree.Traverse(context.Background(), func(ctx context.Context, node *plan_node.PlanNode) error {
44-
tableHandle := plan_node.ParseHiveTableHandle(node.Identifier)
45-
id := fmt.Sprintf("%s_%s", node.Id, node.Name)
46-
if node.Details != "" {
47-
ast, parseErr := plan_node.PlanNodeDetailParser.ParseString(id, node.Details)
48-
if !assert.Nil(t, parseErr) {
49-
t.FailNow()
50-
}
51-
for i := len(ast.Stmts) - 1; i >= 0; i-- {
52-
if assignment, ok := ast.Stmts[i].(*plan_node.Assignment); ok {
53-
assignmentMap[assignment.Identifier.Ident] = traceValue(assignmentMap, tableHandle, assignment.AssignedValue)
54-
}
55-
}
56-
}
57-
if plan_node.IsJoin[node.Name] {
58-
preds, parseErr := plan_node.PlanNodeJoinPredicatesParser.ParseString(id, node.Identifier)
59-
if !assert.Nil(t, parseErr) {
60-
t.FailNow()
61-
}
62-
b := strings.Builder{}
63-
for _, pred := range preds.Predicates {
64-
b.WriteString(assignmentMap[pred.LeftColumn].String())
65-
b.WriteString(" x ")
66-
b.WriteString(assignmentMap[pred.RightColumn].String())
67-
b.WriteString("\n")
68-
}
69-
fmt.Printf("%s\n%s\n", id, b.String())
70-
}
71-
return nil
72-
}, plan_node.PlanTreeDFSTraverse))
31+
func TestParseJoin(t *testing.T) {
32+
testParseJoin(t, "sample.plan.json",
33+
`[{RightJoin CAST(glue.ng_public.admin_system_city.id AS bigint) glue.lks.LR_branded_car_enrollment.city_id} {LeftJoin glue.lks.LR_branded_car_enrollment.country glue.lks.LR_admin_system_country.code} {InnerJoin CAST(glue.ng_public.fleet_car.id AS bigint) glue.ng_public.fleet_car_tag_binding.car_id} {LeftJoin glue.ng_public.fleet_car_tag_binding.car_tag_id CAST(glue.ng_public.fleet_car_tag.id AS bigint)} {LeftJoin glue.lks.LR_branded_car_enrollment.car_id CAST(glue.ng_public.fleet_car.id AS bigint)}]`)
34+
testParseJoin(t, "arithmetics.plan.json", `[{InnerJoin (hive.test_join.t1.a) + (INTEGER '2') ((hive.test_join.t2.b) + (INTEGER '1')) - ((INTEGER '2') * (abs(hive.test_join.t2.c)))}]`)
7335
}
7436

7537
func TestJsonFloat64(t *testing.T) {

presto/plan_node/sample_plan.json renamed to presto/plan_node/sample.plan.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,8 @@
434434
"plan": {
435435
"id": "2372",
436436
"name": "ScanProject",
437-
"identifier": "[table = TableHandle {connectorId='glue', connectorHandle='HiveTableHandle{schemaName=looker_scratch, tableName=LR_SH2E81713104009327_branded_car_enrollment, analyzePartitionValues=Optional.empty}', layout='Optional[looker_scratch.LR_SH2E81713104009327_branded_car_enrollment{domains={car_id=[ [[\"1\", <max>)] ], country=[ [[\"gh\"]] ], cohort=[ [[\"Branded_Verified\"]] ]}}]'}, projectLocality = LOCAL]",
438-
"details": "$hashvalue_84 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(city_id), BIGINT'0')) (23:6)\nLAYOUT: looker_scratch.LR_SH2E81713104009327_branded_car_enrollment{domains={car_id=[ [[\"1\", <max>)] ], country=[ [[\"gh\"]] ], cohort=[ [[\"Branded_Verified\"]] ]}}\ncountry := country:string:1:REGULAR (23:6)\n :: [[\"gh\"]]\ncity_id := city_id:bigint:-13:PARTITION_KEY (23:6)\n :: [[\"706\"]]\ncar_id := car_id:bigint:0:REGULAR (23:6)\n :: [[\"1\", <max>)]\ncohort:varchar(16):2:REGULAR\n :: [[\"Branded_Verified\"]]\n",
437+
"identifier": "[table = TableHandle {connectorId='glue', connectorHandle='HiveTableHandle{schemaName=lks, tableName=LR_branded_car_enrollment, analyzePartitionValues=Optional.empty}', layout='Optional[lks.LR_branded_car_enrollment{domains={car_id=[ [[\"1\", <max>)] ], country=[ [[\"gh\"]] ], cohort=[ [[\"Branded_Verified\"]] ]}}]'}, projectLocality = LOCAL]",
438+
"details": "$hashvalue_84 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(city_id), BIGINT'0')) (23:6)\nLAYOUT: lks.LR_branded_car_enrollment{domains={car_id=[ [[\"1\", <max>)] ], country=[ [[\"gh\"]] ], cohort=[ [[\"Branded_Verified\"]] ]}}\ncountry := country:string:1:REGULAR (23:6)\n :: [[\"gh\"]]\ncity_id := city_id:bigint:-13:PARTITION_KEY (23:6)\n :: [[\"706\"]]\ncar_id := car_id:bigint:0:REGULAR (23:6)\n :: [[\"1\", <max>)]\ncohort:varchar(16):2:REGULAR\n :: [[\"Branded_Verified\"]]\n",
439439
"children": [],
440440
"remoteSources": [],
441441
"estimates": [
@@ -502,8 +502,8 @@
502502
"plan": {
503503
"id": "2374",
504504
"name": "ScanProject",
505-
"identifier": "[table = TableHandle {connectorId='glue', connectorHandle='HiveTableHandle{schemaName=looker_scratch, tableName=LR_SHOSW1713111405917_admin_system_country, analyzePartitionValues=Optional.empty}', layout='Optional[looker_scratch.LR_SHOSW1713111405917_admin_system_country{domains={code=[ [[\"gh\"]] ]}}]'}, projectLocality = LOCAL]",
506-
"details": "$hashvalue_88 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(code), BIGINT'0')) (25:11)\nLAYOUT: looker_scratch.LR_SHOSW1713111405917_admin_system_country{domains={code=[ [[\"gh\"]] ]}}\ncode := code:string:2:REGULAR (25:11)\n :: [[\"gh\"]]\n",
505+
"identifier": "[table = TableHandle {connectorId='glue', connectorHandle='HiveTableHandle{schemaName=lks, tableName=LR_admin_system_country, analyzePartitionValues=Optional.empty}', layout='Optional[lks.LR_admin_system_country{domains={code=[ [[\"gh\"]] ]}}]'}, projectLocality = LOCAL]",
506+
"details": "$hashvalue_88 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(code), BIGINT'0')) (25:11)\nLAYOUT: lks.LR_admin_system_country{domains={code=[ [[\"gh\"]] ]}}\ncode := code:string:2:REGULAR (25:11)\n :: [[\"gh\"]]\n",
507507
"children": [],
508508
"remoteSources": [],
509509
"estimates": [

presto/plan_node/sample.sql

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
-- Sample
2+
WITH fleet_car_tag_list AS (SELECT
3+
fc.id AS car_id
4+
,ARRAY_JOIN(ARRAY_AGG(DISTINCT fct.id ORDER BY fct.id),';') AS car_tag_id_list
5+
,ARRAY_JOIN(ARRAY_AGG(DISTINCT fct.key ORDER BY fct.key),';') AS car_tag_key_list
6+
,ARRAY_JOIN(ARRAY_AGG(DISTINCT fct.name ORDER BY fct.name),';') AS car_tag_list
7+
8+
FROM ng_public.fleet_car fc
9+
LEFT JOIN ng_public.fleet_car_tag_binding fctb
10+
ON fctb.car_id = fc.id
11+
LEFT JOIN ng_public.fleet_car_tag fct
12+
ON fct.id = fctb.car_tag_id
13+
14+
---Deleted/removed tags are excluded (a query for finding deleted rows is state = 'deleted' OR state IS NULL).---
15+
WHERE
16+
LOWER(fctb.state) = 'active'
17+
18+
GROUP BY
19+
fc.id
20+
)
21+
SELECT
22+
branded_car_enrollment.car_id AS \"branded_car_enrollment.target_id\"
23+
FROM lks.LR_branded_car_enrollment AS branded_car_enrollment
24+
LEFT JOIN admin_system_city AS admin_system_city ON branded_car_enrollment.city_id = admin_system_city.id
25+
LEFT JOIN lks.LR_admin_system_country AS admin_system_country ON branded_car_enrollment.country = admin_system_country.code
26+
LEFT JOIN fleet_car_tag_list ON branded_car_enrollment.car_id = fleet_car_tag_list.car_id
27+
WHERE ((fleet_car_tag_list.car_tag_list ) NOT LIKE '%bajaji%' AND ((fleet_car_tag_list.car_tag_list ) NOT LIKE '%Bajaji%' AND (fleet_car_tag_list.car_tag_list ) NOT LIKE '%Boda%') AND ((fleet_car_tag_list.car_tag_list ) NOT LIKE '%Bodas%' AND (fleet_car_tag_list.car_tag_list ) NOT LIKE '%boda%' AND ((fleet_car_tag_list.car_tag_list ) NOT LIKE '%bodas%' AND (fleet_car_tag_list.car_tag_list ) NOT LIKE '%TukTuk%')) OR (fleet_car_tag_list.car_tag_list ) IS NULL) AND (((( branded_car_enrollment.car_id >= 1) AND ( branded_car_enrollment.country = 'gh')) AND ( branded_car_enrollment.cohort = 'Branded_Verified')) AND ( branded_car_enrollment.city_id = 991))
28+
GROUP BY
29+
1
30+
ORDER BY
31+
1
32+
LIMIT 5000

0 commit comments

Comments
 (0)