Skip to content

Commit 3ff7be4

Browse files
snuyanzinmxm
authored andcommitted
[FLINK-36026][table] Options from OPTIONS hint should be present in compiled plan
This closes #25186
1 parent 4002652 commit 3ff7be4

File tree

4 files changed

+117
-10
lines changed

4 files changed

+117
-10
lines changed

.editorconfig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ ij_java_variable_annotation_wrap = normal
263263
ij_java_wrap_first_method_in_call_chain = true
264264
# ij_java_wrap_long_lines = false
265265

266+
[*.out]
267+
insert_final_newline = false
268+
266269
[*.xml]
267270
indent_style = tab
268271
indent_size = 4

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,19 @@ public RelNode toRel(ToRelContext toRelContext) {
107107

108108
// finalize catalog table with option hints
109109
final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
110-
final ContextResolvedTable catalogTable =
110+
final ContextResolvedTable contextTableWithHints =
111111
computeContextResolvedTable(context, hintedOptions);
112112

113113
// create table source
114114
final DynamicTableSource tableSource =
115-
createDynamicTableSource(context, catalogTable.getResolvedTable());
115+
createDynamicTableSource(context, contextTableWithHints.getResolvedTable());
116116

117117
// prepare table source and convert to RelNode
118118
return DynamicSourceUtils.convertSourceToRel(
119119
!schemaTable.isStreamingMode(),
120120
context.getTableConfig(),
121121
relBuilder,
122-
schemaTable.getContextResolvedTable(),
122+
contextTableWithHints,
123123
schemaTable.getStatistic(),
124124
hints,
125125
tableSource);

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,25 @@ void testCompilePlanSql() throws IOException {
8383
tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable");
8484
String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
8585
assertThat(
86-
TableTestUtil.replaceExecNodeId(
87-
TableTestUtil.replaceFlinkVersion(
88-
TableTestUtil.getFormattedJson(
89-
compiledPlan.asJsonString()))))
86+
getPreparedToCompareCompiledPlan(
87+
TableTestUtil.getFormattedJson(compiledPlan.asJsonString())))
9088
.isEqualTo(
91-
TableTestUtil.replaceExecNodeId(
92-
TableTestUtil.replaceFlinkVersion(
93-
TableTestUtil.getFormattedJson(expected))));
89+
getPreparedToCompareCompiledPlan(TableTestUtil.getFormattedJson(expected)));
90+
}
91+
92+
@Test
93+
void testSourceTableWithHints() {
94+
CompiledPlan compiledPlan =
95+
tableEnv.compilePlanSql(
96+
"INSERT INTO MySink SELECT * FROM MyTable"
97+
// OPTIONS hints here do not play any significant role
98+
// we just have to be sure that these options are present in
99+
// compiled plan
100+
+ " /*+ OPTIONS('bounded'='true', 'scan.parallelism'='2') */");
101+
102+
String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out");
103+
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
104+
.isEqualTo(expected);
94105
}
95106

96107
@Test
@@ -432,4 +443,8 @@ private File createSourceSinkTables() throws IOException {
432443
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
433444
return createTestCsvSinkTable("sink", COLUMNS_DEFINITION);
434445
}
446+
447+
private String getPreparedToCompareCompiledPlan(final String planAsString) {
448+
return TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString));
449+
}
435450
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
{
2+
"flinkVersion": "",
3+
"nodes" : [ {
4+
"id": 0,
5+
"type" : "stream-exec-table-source-scan_1",
6+
"scanTableSource" : {
7+
"table" : {
8+
"identifier" : "`default_catalog`.`default_database`.`MyTable`",
9+
"resolvedTable" : {
10+
"schema" : {
11+
"columns" : [ {
12+
"name" : "a",
13+
"dataType" : "BIGINT"
14+
}, {
15+
"name" : "b",
16+
"dataType" : "INT"
17+
}, {
18+
"name" : "c",
19+
"dataType" : "VARCHAR(2147483647)"
20+
} ],
21+
"watermarkSpecs" : [ ]
22+
},
23+
"partitionKeys" : [ ],
24+
"options" : {
25+
"bounded" : "true",
26+
"connector" : "values",
27+
"scan.parallelism" : "2"
28+
}
29+
}
30+
}
31+
},
32+
"outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
33+
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.parallelism=2, bounded=true}]]])",
34+
"inputProperties" : [ ]
35+
}, {
36+
"id": 0,
37+
"type" : "stream-exec-sink_1",
38+
"configuration" : {
39+
"table.exec.sink.keyed-shuffle" : "AUTO",
40+
"table.exec.sink.not-null-enforcer" : "ERROR",
41+
"table.exec.sink.rowtime-inserter" : "ENABLED",
42+
"table.exec.sink.type-length-enforcer" : "IGNORE",
43+
"table.exec.sink.upsert-materialize" : "AUTO"
44+
},
45+
"dynamicTableSink" : {
46+
"table" : {
47+
"identifier" : "`default_catalog`.`default_database`.`MySink`",
48+
"resolvedTable" : {
49+
"schema" : {
50+
"columns" : [ {
51+
"name" : "a",
52+
"dataType" : "BIGINT"
53+
}, {
54+
"name" : "b",
55+
"dataType" : "INT"
56+
}, {
57+
"name" : "c",
58+
"dataType" : "VARCHAR(2147483647)"
59+
} ],
60+
"watermarkSpecs" : [ ]
61+
},
62+
"partitionKeys" : [ ],
63+
"options" : {
64+
"connector" : "values",
65+
"table-sink-class" : "DEFAULT"
66+
}
67+
}
68+
}
69+
},
70+
"inputChangelogMode" : [ "INSERT" ],
71+
"inputProperties" : [ {
72+
"requiredDistribution" : {
73+
"type" : "UNKNOWN"
74+
},
75+
"damBehavior" : "PIPELINED",
76+
"priority" : 0
77+
} ],
78+
"outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
79+
"description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])"
80+
} ],
81+
"edges" : [ {
82+
"source": 0,
83+
"target": 0,
84+
"shuffle" : {
85+
"type" : "FORWARD"
86+
},
87+
"shuffleMode" : "PIPELINED"
88+
} ]
89+
}

0 commit comments

Comments
 (0)