Skip to content

Commit 0588574

Browse files
Thanzeel-Hassan-IBMAjas-MangalHaritha-Kolothpinglerin
authored
Add RFC for JDBC Join Push down (#32)
Co-authored-by: Ajas M <[email protected]> Co-authored-by: Haritha K <[email protected]> Co-authored-by: Glerin <[email protected]>
1 parent 9d62ffe commit 0588574

24 files changed

+955
-0
lines changed

RFC-0009-jdbc-join-push-down.md

Lines changed: 573 additions & 0 deletions
Large diffs are not rendered by default.
6 KB
Binary file not shown.
138 KB
Loading
223 KB
Loading
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
Query Plan
2+
"Fragment 1 [HASH]
3+
CPU: 885.73ms, Scheduled: 1.57s, Input: 7,080,000 rows (374.43MB); per task: avg.: 7,080,000.00 std.dev.: 0.00, Output: 50 rows (2.51kB), 1 tasks
4+
Output layout: [assignment_id, role, is_current, cust_id, first_name, last_name, default_int_4]
5+
Output partitioning: SINGLE []
6+
Stage Execution Strategy: UNGROUPED_EXECUTION
7+
- InnerJoin[PlanNodeId 8][(""assignment_id"" = ""assignment_id_2"")][$hashvalue, $hashvalue_45] => [assignment_id:integer, role:varchar, is_current:boolean, cust_id:integer, first_name:varchar, last_name:varchar, default_int_4:integer]
8+
CPU: 690.00ms (1.34%), Scheduled: 1.03s (1.10%), Output: 50 rows (2.51kB)
9+
Left (probe) Input avg.: 441,875.00 rows, Input std.dev.: 175.91%
10+
Right (build) Input avg.: 625.00 rows, Input std.dev.: 5.72%
11+
Collisions avg.: 497.13 (98.02% est.), Collisions std.dev.: 20.35%
12+
Distribution: PARTITIONED
13+
- RemoteSource[2] => [assignment_id:integer, role:varchar, is_current:boolean, cust_id:integer, first_name:varchar, last_name:varchar, $hashvalue:bigint]
14+
CPU: 164.00ms (0.32%), Scheduled: 498.00ms (0.53%), Output: 7,070,000 rows (374.25MB)
15+
Input avg.: 441,875.00 rows, Input std.dev.: 175.91%
16+
- LocalExchange[PlanNodeId 547][HASH][$hashvalue_45] (assignment_id_2) => [assignment_id_2:integer, default_int_4:integer, $hashvalue_45:bigint]
17+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
18+
CPU: 1.00ms (0.00%), Scheduled: 7.00ms (0.01%), Output: 10,000 rows (185.55kB)
19+
Input avg.: 625.00 rows, Input std.dev.: 387.30%
20+
- RemoteSource[5] => [assignment_id_2:integer, default_int_4:integer, $hashvalue_46:bigint]
21+
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 10,000 rows (185.55kB)
22+
Input avg.: 625.00 rows, Input std.dev.: 387.30%
23+
24+
Fragment 2 [HASH]
25+
CPU: 12.83s, Scheduled: 23.50s, Input: 17,070,000 rows (561.97MB); per task: avg.: 17,070,000.00 std.dev.: 0.00, Output: 7,070,000 rows (374.25MB), 1 tasks
26+
Output layout: [assignment_id, role, is_current, cust_id, first_name, last_name, $hashvalue_44]
27+
Output partitioning: HASH [assignment_id][$hashvalue_44]
28+
Stage Execution Strategy: UNGROUPED_EXECUTION
29+
- Project[PlanNodeId 607][projectLocality = LOCAL] => [assignment_id:integer, role:varchar, is_current:boolean, cust_id:integer, first_name:varchar, last_name:varchar, $hashvalue_44:bigint]
30+
CPU: 4.90s (9.54%), Scheduled: 9.41s (10.08%), Output: 7,070,000 rows (374.25MB)
31+
Input avg.: 441,875.00 rows, Input std.dev.: 130.02%
32+
$hashvalue_44 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(assignment_id), BIGINT'0')) (12:3)
33+
- InnerJoin[PlanNodeId 4][(""employee_id"" = ""cust_id"")][$hashvalue_39, $hashvalue_41] => [assignment_id:integer, role:varchar, is_current:boolean, cust_id:integer, first_name:varchar, last_name:varchar]
34+
CPU: 6.13s (11.94%), Scheduled: 11.32s (12.13%), Output: 7,070,000 rows (313.57MB)
35+
Left (probe) Input avg.: 625,000.00 rows, Input std.dev.: 150.25%
36+
Right (build) Input avg.: 441,875.00 rows, Input std.dev.: 0.11%
37+
Collisions avg.: 160,974.71 (100.11% est.), Collisions std.dev.: 0.48%
38+
Distribution: PARTITIONED
39+
- RemoteSource[3] => [assignment_id:integer, employee_id:integer, role:varchar, is_current:boolean, $hashvalue_39:bigint]
40+
CPU: 131.00ms (0.26%), Scheduled: 276.00ms (0.30%), Output: 10,000,000 rows (318.53MB)
41+
Input avg.: 625,000.00 rows, Input std.dev.: 150.25%
42+
- LocalExchange[PlanNodeId 546][HASH][$hashvalue_41] (cust_id) => [cust_id:integer, first_name:varchar, last_name:varchar, $hashvalue_41:bigint]
43+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
44+
CPU: 1.46s (2.85%), Scheduled: 2.09s (2.24%), Output: 7,070,000 rows (243.44MB)
45+
Input avg.: 441,875.00 rows, Input std.dev.: 195.32%
46+
- RemoteSource[4] => [cust_id:integer, first_name:varchar, last_name:varchar, $hashvalue_42:bigint]
47+
CPU: 77.00ms (0.15%), Scheduled: 118.00ms (0.13%), Output: 7,070,000 rows (243.44MB)
48+
Input avg.: 441,875.00 rows, Input std.dev.: 195.32%
49+
50+
Fragment 3 [SOURCE]
51+
CPU: 19.93s, Scheduled: 33.70s, Input: 10,000,000 rows (0B); per task: avg.: 10,000,000.00 std.dev.: 0.00, Output: 10,000,000 rows (318.53MB), 1 tasks
52+
Output layout: [assignment_id, employee_id, role, is_current, $hashvalue_40]
53+
Output partitioning: HASH [employee_id][$hashvalue_40]
54+
Stage Execution Strategy: UNGROUPED_EXECUTION
55+
- ScanProject[PlanNodeId 0,605][table = TableHandle {connectorId='db2', connectorHandle='JdbcTableHandle{connectorId=db2, schemaTableName=db2.assignments_10_million, catalogName=null, schemaName=DB2, tableName=ASSIGNMENTS_10_MILLION, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, grouped = false, projectLocality = LOCAL] => [assignment_id:integer, employee_id:integer, role:varchar, is_current:boolean, $hashvalue_40:bigint]
56+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
57+
CPU: 19.93s (38.84%), Scheduled: 33.70s (36.10%), Output: 10,000,000 rows (318.53MB)
58+
Input avg.: 10,000,000.00 rows, Input std.dev.: 0.00%
59+
$hashvalue_40 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(employee_id), BIGINT'0')) (12:3)
60+
LAYOUT: {domains=ALL, additionalPredicate={}}
61+
is_current := JdbcColumnHandle{connectorId=db2, columnName=IS_CURRENT, jdbcTypeHandle=JdbcTypeHandle{jdbcType=16, jdbcTypeName=BOOLEAN, columnSize=1, decimalDigits=0, arrayDimensions=null}, columnType=boolean, nullable=true, comment=Optional.empty} (12:3)
62+
employee_id := JdbcColumnHandle{connectorId=db2, columnName=EMPLOYEE_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (12:3)
63+
role := JdbcColumnHandle{connectorId=db2, columnName=ROLE, jdbcTypeHandle=JdbcTypeHandle{jdbcType=12, jdbcTypeName=VARCHAR, columnSize=50, decimalDigits=0, arrayDimensions=null}, columnType=varchar, nullable=true, comment=Optional.empty} (12:3)
64+
assignment_id := JdbcColumnHandle{connectorId=db2, columnName=ASSIGNMENT_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (12:3)
65+
Input: 10,000,000 rows (0B), Filtered: 0.00%
66+
67+
Fragment 4 [SOURCE]
68+
CPU: 17.81s, Scheduled: 32.80s, Input: 7,070,000 rows (0B); per task: avg.: 7,070,000.00 std.dev.: 0.00, Output: 7,070,000 rows (243.44MB), 1 tasks
69+
Output layout: [cust_id, first_name, last_name, $hashvalue_43]
70+
Output partitioning: HASH [cust_id][$hashvalue_43]
71+
Stage Execution Strategy: UNGROUPED_EXECUTION
72+
- ScanProject[PlanNodeId 1,606][table = TableHandle {connectorId='db2', connectorHandle='JdbcTableHandle{connectorId=db2, schemaTableName=db2.customer_10_million, catalogName=null, schemaName=DB2, tableName=CUSTOMER_10_MILLION, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, grouped = false, projectLocality = LOCAL] => [cust_id:integer, first_name:varchar, last_name:varchar, $hashvalue_43:bigint]
73+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
74+
CPU: 17.80s (34.69%), Scheduled: 32.81s (35.14%), Output: 7,070,000 rows (243.44MB)
75+
Input avg.: 7,070,000.00 rows, Input std.dev.: 0.00%
76+
$hashvalue_43 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cust_id), BIGINT'0')) (13:8)
77+
LAYOUT: {domains=ALL, additionalPredicate={}}
78+
cust_id := JdbcColumnHandle{connectorId=db2, columnName=CUST_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (13:8)
79+
last_name := JdbcColumnHandle{connectorId=db2, columnName=LAST_NAME, jdbcTypeHandle=JdbcTypeHandle{jdbcType=12, jdbcTypeName=VARCHAR, columnSize=50, decimalDigits=0, arrayDimensions=null}, columnType=varchar, nullable=true, comment=Optional.empty} (13:8)
80+
first_name := JdbcColumnHandle{connectorId=db2, columnName=FIRST_NAME, jdbcTypeHandle=JdbcTypeHandle{jdbcType=12, jdbcTypeName=VARCHAR, columnSize=50, decimalDigits=0, arrayDimensions=null}, columnType=varchar, nullable=true, comment=Optional.empty} (13:8)
81+
Input: 7,070,000 rows (0B), Filtered: 0.00%
82+
83+
Fragment 5 [SOURCE]
84+
CPU: 32.56ms, Scheduled: 2.10s, Input: 10,000 rows (0B); per task: avg.: 10,000.00 std.dev.: 0.00, Output: 10,000 rows (185.55kB), 1 tasks
85+
Output layout: [assignment_id_2, default_int_4, $hashvalue_47]
86+
Output partitioning: HASH [assignment_id_2][$hashvalue_47]
87+
Stage Execution Strategy: UNGROUPED_EXECUTION
88+
- ScanProject[PlanNodeId 5,608][table = TableHandle {connectorId='db2', connectorHandle='JdbcTableHandle{connectorId=db2, schemaTableName=db2.join_table_50_rows, catalogName=null, schemaName=DB2, tableName=JOIN_TABLE_50_ROWS, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, grouped = false, projectLocality = LOCAL] => [assignment_id_2:integer, default_int_4:integer, $hashvalue_47:bigint]
89+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
90+
CPU: 32.00ms (0.06%), Scheduled: 2.10s (2.25%), Output: 10,000 rows (185.55kB)
91+
Input avg.: 10,000.00 rows, Input std.dev.: 0.00%
92+
$hashvalue_47 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(assignment_id_2), BIGINT'0')) (14:8)
93+
LAYOUT: {domains=ALL, additionalPredicate={}}
94+
default_int_4 := JdbcColumnHandle{connectorId=db2, columnName=DEFAULT_INT, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (14:8)
95+
assignment_id_2 := JdbcColumnHandle{connectorId=db2, columnName=ASSIGNMENT_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (14:8)
96+
Input: 10,000 rows (0B), Filtered: 0.00%",
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
Query Plan
2+
"Fragment 1 [HASH]
3+
CPU: 1.16s, Scheduled: 2.07s, Input: 10,010,000 rows (271.03MB); per task: avg.: 10,010,000.00 std.dev.: 0.00, Output: 50 rows (1.19kB), 1 tasks
4+
Output layout: [assignment_id, role, is_current, default_int_1]
5+
Output partitioning: SINGLE []
6+
Stage Execution Strategy: UNGROUPED_EXECUTION
7+
- InnerJoin[PlanNodeId 4][(""assignment_id"" = ""assignment_id_0"")][$hashvalue, $hashvalue_24] => [assignment_id:integer, role:varchar, is_current:boolean, default_int_1:integer]
8+
CPU: 1.02s (5.00%), Scheduled: 1.84s (4.56%), Output: 50 rows (1.19kB)
9+
Left (probe) Input avg.: 625,000.00 rows, Input std.dev.: 194.00%
10+
Right (build) Input avg.: 625.00 rows, Input std.dev.: 5.72%
11+
Collisions avg.: 497.13 (98.02% est.), Collisions std.dev.: 20.35%
12+
Distribution: PARTITIONED
13+
- RemoteSource[2] => [assignment_id:integer, role:varchar, is_current:boolean, $hashvalue:bigint]
14+
CPU: 102.00ms (0.50%), Scheduled: 154.00ms (0.38%), Output: 10,000,000 rows (270.85MB)
15+
Input avg.: 625,000.00 rows, Input std.dev.: 194.00%
16+
- LocalExchange[PlanNodeId 364][HASH][$hashvalue_24] (assignment_id_0) => [assignment_id_0:integer, default_int_1:integer, $hashvalue_24:bigint]
17+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
18+
CPU: 1.00ms (0.00%), Scheduled: 8.00ms (0.02%), Output: 10,000 rows (185.55kB)
19+
Input avg.: 625.00 rows, Input std.dev.: 387.30%
20+
- RemoteSource[3] => [assignment_id_0:integer, default_int_1:integer, $hashvalue_25:bigint]
21+
CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 10,000 rows (185.55kB)
22+
Input avg.: 625.00 rows, Input std.dev.: 387.30%
23+
24+
Fragment 2 [SOURCE]
25+
CPU: 19.18s, Scheduled: 36.29s, Input: 10,000,000 rows (0B); per task: avg.: 10,000,000.00 std.dev.: 0.00, Output: 10,000,000 rows (270.85MB), 1 tasks
26+
Output layout: [assignment_id, role, is_current, $hashvalue_23]
27+
Output partitioning: HASH [assignment_id][$hashvalue_23]
28+
Stage Execution Strategy: UNGROUPED_EXECUTION
29+
- ScanProject[PlanNodeId 0,402][table = TableHandle {connectorId='db2', connectorHandle='JdbcTableHandle{connectorId=db2, schemaTableName=db2.assignments_10_million, catalogName=null, schemaName=DB2, tableName=ASSIGNMENTS_10_MILLION, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, grouped = false, projectLocality = LOCAL] => [assignment_id:integer, role:varchar, is_current:boolean, $hashvalue_23:bigint]
30+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
31+
CPU: 19.18s (94.33%), Scheduled: 36.28s (89.81%), Output: 10,000,000 rows (270.85MB)
32+
Input avg.: 10,000,000.00 rows, Input std.dev.: 0.00%
33+
$hashvalue_23 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(assignment_id), BIGINT'0')) (9:3)
34+
LAYOUT: {domains=ALL, additionalPredicate={}}
35+
is_current := JdbcColumnHandle{connectorId=db2, columnName=IS_CURRENT, jdbcTypeHandle=JdbcTypeHandle{jdbcType=16, jdbcTypeName=BOOLEAN, columnSize=1, decimalDigits=0, arrayDimensions=null}, columnType=boolean, nullable=true, comment=Optional.empty} (9:3)
36+
role := JdbcColumnHandle{connectorId=db2, columnName=ROLE, jdbcTypeHandle=JdbcTypeHandle{jdbcType=12, jdbcTypeName=VARCHAR, columnSize=50, decimalDigits=0, arrayDimensions=null}, columnType=varchar, nullable=true, comment=Optional.empty} (9:3)
37+
assignment_id := JdbcColumnHandle{connectorId=db2, columnName=ASSIGNMENT_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (9:3)
38+
Input: 10,000,000 rows (0B), Filtered: 0.00%
39+
40+
Fragment 3 [SOURCE]
41+
CPU: 33.76ms, Scheduled: 2.11s, Input: 10,000 rows (0B); per task: avg.: 10,000.00 std.dev.: 0.00, Output: 10,000 rows (185.55kB), 1 tasks
42+
Output layout: [assignment_id_0, default_int_1, $hashvalue_26]
43+
Output partitioning: HASH [assignment_id_0][$hashvalue_26]
44+
Stage Execution Strategy: UNGROUPED_EXECUTION
45+
- ScanProject[PlanNodeId 1,403][table = TableHandle {connectorId='db2', connectorHandle='JdbcTableHandle{connectorId=db2, schemaTableName=db2.join_table_50_rows, catalogName=null, schemaName=DB2, tableName=JOIN_TABLE_50_ROWS, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, grouped = false, projectLocality = LOCAL] => [assignment_id_0:integer, default_int_1:integer, $hashvalue_26:bigint]
46+
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
47+
CPU: 34.00ms (0.17%), Scheduled: 2.11s (5.23%), Output: 10,000 rows (185.55kB)
48+
Input avg.: 10,000.00 rows, Input std.dev.: 0.00%
49+
$hashvalue_26 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(assignment_id_0), BIGINT'0')) (10:8)
50+
LAYOUT: {domains=ALL, additionalPredicate={}}
51+
assignment_id_0 := JdbcColumnHandle{connectorId=db2, columnName=ASSIGNMENT_ID, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (10:8)
52+
default_int_1 := JdbcColumnHandle{connectorId=db2, columnName=DEFAULT_INT, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=INTEGER, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (10:8)
53+
Input: 10,000 rows (0B), Filtered: 0.00%",

0 commit comments

Comments
 (0)