Skip to content

Commit bfcf13b

Browse files
committed
[FLINK-38765] Fix persisted metadata handling in sink
1 parent a56134d commit bfcf13b

File tree

57 files changed

+1622
-42
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1622
-42
lines changed

flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])"
3131
}, {
3232
"id" : 0,
33-
"type" : "stream-exec-sink_1",
33+
"type" : "stream-exec-sink_2",
3434
"configuration" : {
3535
"table.exec.sink.keyed-shuffle" : "AUTO",
3636
"table.exec.sink.not-null-enforcer" : "ERROR",

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ public String toString() {
114114
return id;
115115
}
116116

117+
@Override
118+
public boolean equals(Object o) {
119+
if (o == null || getClass() != o.getClass()) {
120+
return false;
121+
}
122+
123+
TableTestProgram that = (TableTestProgram) o;
124+
return id.equals(that.id);
125+
}
126+
127+
@Override
128+
public int hashCode() {
129+
return id.hashCode();
130+
}
131+
117132
/**
118133
* Entrypoint for a {@link TableTestProgram} that forces an identifier and description of the
119134
* test program.

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1326,7 +1326,7 @@ private static void validateAndApplyTargetColumns(
13261326
*
13271327
* <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS}
13281328
*/
1329-
private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) {
1329+
public static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) {
13301330
final Map<String, DataType> metadataMap = extractMetadataMap(sink);
13311331

13321332
final Stream<RowField> physicalFields =

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,24 +124,28 @@ protected Transformation<Object> translateToPlanInternal(
124124
}
125125

126126
@Override
127-
protected RowType getPhysicalRowType(ResolvedSchema schema) {
127+
protected final RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink tableSink) {
128128
// row-level modification may only write partial columns,
129129
// so we try to prune the RowType to get the real RowType containing
130130
// the physical columns to be written
131131
if (tableSinkSpec.getSinkAbilities() != null) {
132132
for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) {
133133
if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
134134
RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec;
135-
return getPhysicalRowType(
136-
schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
135+
return getPersistedRowType(
136+
schema,
137+
rowLevelUpdateSpec.getRequiredPhysicalColumnIndices(),
138+
tableSink);
137139
} else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
138140
RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec;
139-
return getPhysicalRowType(
140-
schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
141+
return getPersistedRowType(
142+
schema,
143+
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices(),
144+
tableSink);
141145
}
142146
}
143147
}
144-
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
148+
return super.getPersistedRowType(schema, tableSink);
145149
}
146150

147151
@Override
@@ -183,12 +187,18 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema)
183187
}
184188

185189
/** Get the physical row type with given column indices. */
186-
private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) {
190+
private RowType getPersistedRowType(
191+
ResolvedSchema schema, int[] columnIndices, DynamicTableSink sink) {
187192
List<Column> columns = schema.getColumns();
188193
List<Column> requireColumns = new ArrayList<>();
189194
for (int columnIndex : columnIndices) {
190195
requireColumns.add(columns.get(columnIndex));
191196
}
192-
return (RowType) ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType();
197+
return super.getPersistedRowType(ResolvedSchema.of(requireColumns), sink);
198+
}
199+
200+
@Override
201+
protected final boolean legacyPhysicalTypeEnabled() {
202+
return false;
193203
}
194204
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
5353
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
5454
import org.apache.flink.table.data.RowData;
55+
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
5556
import org.apache.flink.table.planner.lineage.TableLineageUtils;
5657
import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
5758
import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl;
@@ -151,8 +152,8 @@ protected Transformation<Object> createSinkTransformation(
151152
tableSink.getSinkRuntimeProvider(
152153
new SinkRuntimeProviderContext(
153154
isBounded, tableSinkSpec.getTargetColumns()));
154-
final RowType physicalRowType = getPhysicalRowType(schema);
155-
final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);
155+
final RowType persistedRowType = getPersistedRowType(schema, tableSink);
156+
final int[] primaryKeys = getPrimaryKeyIndices(persistedRowType, schema);
156157
final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
157158
sinkParallelismConfigured = isParallelismConfigured(runtimeProvider);
158159
final int inputParallelism = inputTransform.getParallelism();
@@ -190,7 +191,7 @@ protected Transformation<Object> createSinkTransformation(
190191
final boolean needMaterialization = !inputInsertOnly && upsertMaterialize;
191192

192193
Transformation<RowData> sinkTransform =
193-
applyConstraintValidations(inputTransform, config, physicalRowType);
194+
applyConstraintValidations(inputTransform, config, persistedRowType);
194195

195196
if (hasPk) {
196197
sinkTransform =
@@ -212,7 +213,7 @@ protected Transformation<Object> createSinkTransformation(
212213
sinkParallelism,
213214
config,
214215
classLoader,
215-
physicalRowType,
216+
persistedRowType,
216217
inputUpsertKey);
217218
}
218219

@@ -545,8 +546,16 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema)
545546
.orElse(new int[0]);
546547
}
547548

548-
protected RowType getPhysicalRowType(ResolvedSchema schema) {
549-
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
549+
/**
550+
* The method recreates the type of the incoming record from the sink's schema. It puts the
551+
* physical columns first, followed by persisted metadata columns.
552+
*/
553+
protected RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink sink) {
554+
if (legacyPhysicalTypeEnabled()) {
555+
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
556+
} else {
557+
return DynamicSinkUtils.createConsumedType(schema, sink);
558+
}
550559
}
551560

552561
/**
@@ -574,4 +583,6 @@ private Optional<RowKind> getTargetRowKind() {
574583
}
575584
return Optional.empty();
576585
}
586+
587+
protected abstract boolean legacyPhysicalTypeEnabled();
577588
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,30 @@
104104
},
105105
minPlanVersion = FlinkVersion.v1_15,
106106
minStateVersion = FlinkVersion.v1_15)
107+
// Version 2: Fixed the data type used for creating constraint enforcer and sink upsert
108+
// materializer. Since this version the sink works correctly with persisted metadata columns.
109+
// We introduced a new version, because statements that were never rolling back to a value from
110+
// state could run succesfully. We allow those jobs to be upgraded. Without a new versions such jobs
111+
// would fail on restore, because the state serializer would differ
112+
@ExecNodeMetadata(
113+
name = "stream-exec-sink",
114+
version = 2,
115+
consumedOptions = {
116+
"table.exec.sink.not-null-enforcer",
117+
"table.exec.sink.type-length-enforcer",
118+
"table.exec.sink.upsert-materialize",
119+
"table.exec.sink.keyed-shuffle",
120+
"table.exec.sink.rowtime-inserter"
121+
},
122+
producedTransformations = {
123+
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
124+
CommonExecSink.PARTITIONER_TRANSFORMATION,
125+
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
126+
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
127+
CommonExecSink.SINK_TRANSFORMATION
128+
},
129+
minPlanVersion = FlinkVersion.v2_3,
130+
minStateVersion = FlinkVersion.v2_3)
107131
public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
108132
private static final Logger LOG = LoggerFactory.getLogger(StreamExecSink.class);
109133

@@ -391,4 +415,9 @@ private static SequencedMultiSetStateConfig createStateConfig(
391415
throw new IllegalArgumentException("Unsupported strategy: " + strategy);
392416
}
393417
}
418+
419+
@Override
420+
protected final boolean legacyPhysicalTypeEnabled() {
421+
return getVersion() == 1;
422+
}
394423
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ void testUidFlink1_18() throws IOException {
188188
config.set(TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>"),
189189
json -> {},
190190
env -> planFromCurrentFlinkValues(env).asJsonString(),
191-
"\\d+_stream-exec-sink_1_sink",
192-
"\\d+_stream-exec-sink_1_constraint-validator",
191+
"\\d+_stream-exec-sink_2_sink",
192+
"\\d+_stream-exec-sink_2_constraint-validator",
193193
"\\d+_stream-exec-values_1_values");
194194
}
195195

@@ -200,7 +200,7 @@ void testPerNodeCustomUid() throws IOException {
200200
json ->
201201
JsonTestUtils.setExecNodeConfig(
202202
json,
203-
"stream-exec-sink_1",
203+
"stream-exec-sink_2",
204204
TABLE_EXEC_UID_FORMAT.key(),
205205
"my_custom_<transformation>_<id>"),
206206
env -> planFromCurrentFlinkValues(env).asJsonString(),

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.common;
2020

21+
import org.apache.flink.table.api.config.ExecutionConfigOptions;
2122
import org.apache.flink.table.catalog.TableDistribution;
2223
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
2324
import org.apache.flink.table.test.program.SinkTestStep;
@@ -28,6 +29,9 @@
2829

2930
import java.util.Arrays;
3031

32+
import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
33+
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;
34+
3135
/** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */
3236
public class TableSinkTestPrograms {
3337

@@ -222,4 +226,75 @@ private static TableTestProgram buildBucketingTest(
222226
.build())
223227
.runSql("INSERT INTO sink_t SELECT * FROM source_t")
224228
.build();
229+
230+
// The queries could run as long as the value was never rolled back to one from state, which is
231+
// possible. We validate those can restore and still run
232+
public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE =
233+
getInsertRetractWithWritableMetadata(true);
234+
235+
public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA =
236+
getInsertRetractWithWritableMetadata(false);
237+
238+
private static TableTestProgram getInsertRetractWithWritableMetadata(
239+
boolean forLegacyPhysicalType) {
240+
final Row producedAfterRestore;
241+
final String consumedAfterRestore;
242+
if (forLegacyPhysicalType) {
243+
producedAfterRestore = Row.ofKind(RowKind.INSERT, "Bob", 7);
244+
consumedAfterRestore = "+U[BOB, 7, Bob, 7]";
245+
} else {
246+
// retract the last record, which should roll back to
247+
// the previous state
248+
producedAfterRestore = Row.ofKind(RowKind.DELETE, "Bob", 6);
249+
consumedAfterRestore = "+U[BOB, 5, Bob, 5]";
250+
}
251+
return TableTestProgram.of(
252+
"insert-into-upsert-with-sink-upsert-materializer-writable-metadata"
253+
+ (forLegacyPhysicalType ? "-v1" : ""),
254+
"The query requires a sink upsert materializer and the sink"
255+
+ " uses writable metadata columns. The scenario showcases a"
256+
+ " bug where a wrong type was used in sinks which did not"
257+
+ " consider metadata columns. There needs to be multiple"
258+
+ " requirements for the bug to show up. 1. We need to use "
259+
+ " rocksdb, so that we use a serializer when putting records"
260+
+ " into state in SinkUpsertMaterializer. 2. We need to retract"
261+
+ " to a previous value taken from the state, otherwise we"
262+
+ " forward the incoming record. 3. There need to be persisted"
263+
+ " metadata columns.")
264+
.setupConfig(
265+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
266+
ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY)
267+
.setupConfig(STATE_BACKEND, "rocksdb")
268+
.setupTableSource(
269+
SourceTestStep.newBuilder("source_t")
270+
.addSchema("name STRING", "score INT")
271+
.addOption("changelog-mode", "I,UB,UA,D")
272+
.producedBeforeRestore(
273+
Row.ofKind(RowKind.INSERT, "Bob", 5),
274+
Row.ofKind(RowKind.INSERT, "Bob", 6))
275+
.producedAfterRestore(producedAfterRestore)
276+
.build())
277+
.setupTableSink(
278+
SinkTestStep.newBuilder("sink_t")
279+
.addSchema(
280+
"name STRING PRIMARY KEY NOT ENFORCED",
281+
"scoreMetadata BIGINT METADATA",
282+
"score BIGINT",
283+
"nameMetadata STRING METADATA")
284+
.addOption("sink-changelog-mode-enforced", "I,UA,D")
285+
// The test sink lists metadata columns
286+
// (SupportsWritingMetadata#listWritableMetadata) in
287+
// alphabetical order, this is also the order in the record of
288+
// a sink, irrespective of the table schema
289+
.addOption(
290+
"writable-metadata",
291+
"nameMetadata:STRING,scoreMetadata:BIGINT")
292+
// physical columns first, then metadata columns, sorted
293+
// alphabetically by columns name (test sink property)
294+
.consumedBeforeRestore("+I[BOB, 5, Bob, 5]", "+U[BOB, 6, Bob, 6]")
295+
.consumedAfterRestore(consumedAfterRestore)
296+
.build())
297+
.runSql("INSERT INTO sink_t SELECT UPPER(name), score, score, name FROM source_t")
298+
.build();
299+
}
225300
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.Arrays;
2626
import java.util.List;
27+
import java.util.Map;
2728

2829
/** Restore tests for {@link StreamExecSink}. */
2930
public class TableSinkRestoreTest extends RestoreTestBase {
@@ -44,6 +45,16 @@ public List<TableTestProgram> programs() {
4445
TableSinkTestPrograms.SINK_WRITING_METADATA,
4546
TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
4647
TableSinkTestPrograms.SINK_PARTIAL_INSERT,
47-
TableSinkTestPrograms.SINK_UPSERT);
48+
TableSinkTestPrograms.SINK_UPSERT,
49+
TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE,
50+
TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA);
51+
}
52+
53+
@Override
54+
protected Map<Integer, List<TableTestProgram>> programsToIgnore() {
55+
return Map.of(
56+
// disable the writable metadata test for sink node with version 1. it fails after
57+
// the restore
58+
1, List.of(TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA));
4859
}
4960
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,35 @@ private Stream<Arguments> createSpecs() {
205205
savepointPath))));
206206
}
207207

208+
// ====================================================================================
209+
// Extension points for adjusting test combinations
210+
// ====================================================================================
211+
212+
/**
213+
* Can be overridden with a collection of programs that should be ignored for a particular
214+
* version of the node under test.
215+
*/
216+
protected Map<Integer, List<TableTestProgram>> programsToIgnore() {
217+
return Collections.emptyMap();
218+
}
219+
208220
/**
209221
* The method can be overridden in a subclass to test multiple savepoint files for a given
210222
* program and a node in a particular version. This can be useful e.g. to test a node against
211223
* savepoint generated in different Flink versions.
212224
*/
213225
protected Stream<String> getSavepointPaths(
214226
TableTestProgram program, ExecNodeMetadata metadata) {
215-
return Stream.of(getSavepointPath(program, metadata, null));
227+
if (programsToIgnore()
228+
.getOrDefault(metadata.version(), Collections.emptyList())
229+
.contains(program)) {
230+
return Stream.empty();
231+
} else {
232+
return Stream.of(getSavepointPath(program, metadata, null));
233+
}
216234
}
217235

236+
/** Can be used in {@link #getSavepointPaths(TableTestProgram, ExecNodeMetadata)}. */
218237
protected final String getSavepointPath(
219238
TableTestProgram program,
220239
ExecNodeMetadata metadata,
@@ -229,6 +248,10 @@ protected final String getSavepointPath(
229248
return builder.toString();
230249
}
231250

251+
// ====================================================================================
252+
// End of extension points
253+
// ====================================================================================
254+
232255
private void registerSinkObserver(
233256
final List<CompletableFuture<?>> futures,
234257
final SinkTestStep sinkTestStep,

0 commit comments

Comments
 (0)