Skip to content

Commit c028790

Browse files
committed
[hotfix] Pull legacyUidsEnabled as an abstract method to CommonExecTableSourceScan
1 parent bfcf13b commit c028790

File tree

3 files changed

+15
-14
lines changed

3 files changed

+15
-14
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public String getDynamicFilteringDataListenerID() {
124124
protected Transformation<RowData> translateToPlanInternal(
125125
PlannerBase planner, ExecNodeConfig config) {
126126
final Transformation<RowData> transformation =
127-
super.createTransformation(planner, config, false);
127+
super.translateToPlanInternal(planner, config);
128128
// the boundedness has been checked via the runtime provider already, so we can safely
129129
// declare all legacy transformations as bounded to make the stream graph generator happy
130130
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
@@ -164,6 +164,11 @@ public Transformation<RowData> createInputFormatTransformation(
164164
return env.addSource(function, operatorName, outputTypeInfo).getTransformation();
165165
}
166166

167+
@Override
168+
protected final boolean legacyUidsEnabled() {
169+
return false;
170+
}
171+
167172
public BatchExecTableSourceScan copyAndRemoveInputs() {
168173
BatchExecTableSourceScan tableSourceScan =
169174
new BatchExecTableSourceScan(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ public DynamicTableSourceSpec getTableSourceSpec() {
112112
return tableSourceSpec;
113113
}
114114

115-
protected Transformation<RowData> createTransformation(
116-
PlannerBase planner, ExecNodeConfig config, boolean legacyUidsEnabled) {
115+
@Override
116+
protected Transformation<RowData> translateToPlanInternal(
117+
PlannerBase planner, ExecNodeConfig config) {
117118
final Transformation<RowData> sourceTransform;
118119
final StreamExecutionEnvironment env = planner.getExecEnv();
119120
final TransformationMetadata metadata =
@@ -186,15 +187,15 @@ protected Transformation<RowData> createTransformation(
186187
((DataStreamScanProvider) provider)
187188
.produceDataStream(createProviderContext(metadata, config), env)
188189
.getTransformation();
189-
if (legacyUidsEnabled) {
190+
if (legacyUidsEnabled()) {
190191
metadata.fill(sourceTransform);
191192
}
192193
sourceTransform.setOutputType(outputTypeInfo);
193194
} else if (provider instanceof TransformationScanProvider) {
194195
sourceTransform =
195196
((TransformationScanProvider) provider)
196197
.createTransformation(createProviderContext(metadata, config));
197-
if (legacyUidsEnabled) {
198+
if (legacyUidsEnabled()) {
198199
metadata.fill(sourceTransform);
199200
}
200201
sourceTransform.setOutputType(outputTypeInfo);
@@ -369,4 +370,6 @@ protected abstract Transformation<RowData> createInputFormatTransformation(
369370
InputFormat<RowData, ?> inputFormat,
370371
InternalTypeInfo<RowData> outputTypeInfo,
371372
String operatorName);
373+
374+
protected abstract boolean legacyUidsEnabled();
372375
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2626
import org.apache.flink.table.connector.source.ScanTableSource;
2727
import org.apache.flink.table.data.RowData;
28-
import org.apache.flink.table.planner.delegation.PlannerBase;
2928
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
30-
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
3129
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
3230
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
3331
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
@@ -94,12 +92,6 @@ public StreamExecTableSourceScan(
9492
description);
9593
}
9694

97-
@Override
98-
protected Transformation<RowData> translateToPlanInternal(
99-
PlannerBase planner, ExecNodeConfig config) {
100-
return createTransformation(planner, config, legacyUidsEnabled());
101-
}
102-
10395
@Override
10496
public Transformation<RowData> createInputFormatTransformation(
10597
StreamExecutionEnvironment env,
@@ -111,7 +103,8 @@ public Transformation<RowData> createInputFormatTransformation(
111103
return env.createInput(inputFormat, outputTypeInfo).name(operatorName).getTransformation();
112104
}
113105

114-
private boolean legacyUidsEnabled() {
106+
@Override
107+
protected final boolean legacyUidsEnabled() {
115108
return getVersion() == 1;
116109
}
117110
}

0 commit comments

Comments
 (0)