Skip to content

Commit 3019fbb

Browse files
committed
[FLINK-38289] Update to Flink 2.1
Signed-off-by: Thomas Cooper <code@tomcooper.dev>
1 parent ecedd63 commit 3019fbb

File tree

7 files changed

+196
-11
lines changed

7 files changed

+196
-11
lines changed

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 2.0.0 ]
31+
flink: [ 2.1.0 ]
3232
jdk: [ '11, 17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
@@ -37,7 +37,7 @@ jobs:
3737
python_test:
3838
strategy:
3939
matrix:
40-
flink: [ 2.0.0 ]
40+
flink: [ 2.1.0 ]
4141
jdk: [ '11, 17, 21' ]
4242
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4343
with:

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2323
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
26-
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
27-
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
2826
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
2927
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
3028
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
@@ -34,6 +32,8 @@ Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(or
3432
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
3533
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
3634
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
35+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
36+
Method <org.apache.flink.connector.kafka.sink.internal.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
3737
Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0)
3838
Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
3939
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -50,6 +50,4 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffs
5050
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
5151
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
5252
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
53-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5453
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.flink.table.data.RowData;
4545
import org.apache.flink.table.types.DataType;
4646
import org.apache.flink.table.types.logical.LogicalType;
47-
import org.apache.flink.table.types.utils.DataTypeUtils;
4847

4948
import org.apache.kafka.clients.producer.ProducerConfig;
5049
import org.apache.kafka.common.header.Header;
@@ -405,7 +404,7 @@ private RowData.FieldGetter[] getFieldGetters(
405404
}
406405
DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
407406
if (prefix != null) {
408-
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
407+
physicalFormatDataType = TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
409408
}
410409
return format.createRuntimeEncoder(context, physicalFormatDataType);
411410
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.flink.table.data.StringData;
5050
import org.apache.flink.table.data.TimestampData;
5151
import org.apache.flink.table.types.DataType;
52-
import org.apache.flink.table.types.utils.DataTypeUtils;
5352
import org.apache.flink.util.Preconditions;
5453

5554
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -571,7 +570,7 @@ private KafkaRecordDeserializationSchema<RowData> createKafkaDeserializationSche
571570
}
572571
DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
573572
if (prefix != null) {
574-
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
573+
physicalFormatDataType = TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
575574
}
576575
return format.createRuntimeDecoder(context, physicalFormatDataType);
577576
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package org.apache.flink.streaming.connectors.kafka.table;
2+
3+
import org.apache.flink.table.types.DataType;
4+
import org.apache.flink.table.types.FieldsDataType;
5+
import org.apache.flink.table.types.logical.LogicalType;
6+
import org.apache.flink.table.types.logical.RowType;
7+
8+
import java.util.List;
9+
import java.util.stream.Collectors;
10+
import java.util.stream.IntStream;
11+
12+
import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
13+
14+
/**
15+
* Utility class for manipulating {@link DataType} objects, particularly for table schemas.
16+
* This class provides methods that were removed from the Flink API in version 2.1.0.
17+
* See <a href="https://github.com/apache/flink/pull/26784">Flink PR 26784</a>.
18+
*/
19+
public class TableDataTypeUtils {
20+
21+
protected static final String STRIP_ROW_NO_ROW_ERROR_MSG = "Row data type expected.";
22+
protected static final String RENAME_ROW_LENGTH_MISMATCH_ERROR_MSG = "Row length and new names must match.";
23+
24+
/**
25+
* Removes a string prefix from the fields of the given row data type.
26+
*
27+
* @param dataType The row data type to modify.
28+
* @param prefix The prefix to remove from the field names.
29+
* @return A new DataType with the modified field names.
30+
* @throws IllegalArgumentException if the provided dataType is not of ROW type.
31+
*/
32+
public static DataType stripRowPrefix(DataType dataType, String prefix) {
33+
34+
if (!dataType.getLogicalType().is(ROW)) {
35+
throw new IllegalArgumentException(STRIP_ROW_NO_ROW_ERROR_MSG);
36+
}
37+
38+
final RowType rowType = (RowType) dataType.getLogicalType();
39+
final List<String> newFieldNames =
40+
rowType.getFieldNames().stream()
41+
.map(
42+
s -> {
43+
if (s.startsWith(prefix)) {
44+
return s.substring(prefix.length());
45+
}
46+
return s;
47+
})
48+
.collect(Collectors.toList());
49+
final LogicalType newRowType = renameRowFields(rowType, newFieldNames);
50+
return new FieldsDataType(
51+
newRowType, dataType.getConversionClass(), dataType.getChildren());
52+
}
53+
54+
/**
55+
* Renames the fields of the given {@link RowType}.
56+
*
57+
* @param rowType The original RowType.
58+
* @param newFieldNames The new field names to apply.
59+
* @return A new RowType with the updated field names.
60+
* @throws IllegalArgumentException if the number of new field names does not match the number
61+
* of fields in the original RowType.
62+
*/
63+
public static RowType renameRowFields(RowType rowType, List<String> newFieldNames) {
64+
65+
if (!(rowType.getFieldCount() == newFieldNames.size())) {
66+
throw new IllegalArgumentException(RENAME_ROW_LENGTH_MISMATCH_ERROR_MSG);
67+
}
68+
69+
final List<RowType.RowField> newFields =
70+
IntStream.range(0, rowType.getFieldCount())
71+
.mapToObj(
72+
pos -> {
73+
final RowType.RowField oldField = rowType.getFields().get(pos);
74+
return new RowType.RowField(
75+
newFieldNames.get(pos),
76+
oldField.getType(),
77+
oldField.getDescription().orElse(null));
78+
})
79+
.collect(Collectors.toList());
80+
return new RowType(rowType.isNullable(), newFields);
81+
}
82+
}
83+
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package org.apache.flink.streaming.connectors.kafka.table;
2+
3+
import org.apache.flink.table.types.DataType;
4+
import org.apache.flink.table.types.logical.IntType;
5+
import org.apache.flink.table.types.logical.RowType;
6+
import org.apache.flink.table.types.logical.VarCharType;
7+
import org.apache.flink.util.TestLogger;
8+
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.util.Arrays;
12+
import java.util.Collections;
13+
import java.util.List;
14+
15+
import static org.apache.flink.table.api.DataTypes.FIELD;
16+
import static org.apache.flink.table.api.DataTypes.INT;
17+
import static org.apache.flink.table.api.DataTypes.ROW;
18+
import static org.apache.flink.table.api.DataTypes.STRING;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
/** Tests for {@link TableDataTypeUtils}. */
23+
public class TableDataTypeUtilsTest extends TestLogger {
24+
25+
@Test
26+
public void testStripRowPrefix() {
27+
DataType rowDataType = ROW(
28+
FIELD("prefix_name", STRING()),
29+
FIELD("prefix_age", INT()),
30+
FIELD("address", STRING())
31+
);
32+
33+
DataType result = TableDataTypeUtils.stripRowPrefix(rowDataType, "prefix_");
34+
35+
RowType rowType = (RowType) result.getLogicalType();
36+
List<String> fieldNames = rowType.getFieldNames();
37+
38+
assertThat(fieldNames).containsExactly("name", "age", "address");
39+
}
40+
41+
@Test
42+
public void testStripRowPrefixWithNoMatch() {
43+
// Create a test row data type with no matching prefixes
44+
DataType rowDataType = ROW(
45+
FIELD("name", STRING()),
46+
FIELD("age", INT()),
47+
FIELD("address", STRING())
48+
);
49+
50+
DataType result = TableDataTypeUtils.stripRowPrefix(rowDataType, "nonexistent_");
51+
52+
// Field names should remain unchanged
53+
RowType rowType = (RowType) result.getLogicalType();
54+
List<String> fieldNames = rowType.getFieldNames();
55+
56+
assertThat(fieldNames).containsExactly("name", "age", "address");
57+
}
58+
59+
@Test
60+
public void testStripRowPrefixInvalidType() {
61+
// Create a non-row data type
62+
DataType nonRowType = STRING();
63+
64+
// Attempt to strip prefix should throw an exception
65+
assertThatThrownBy(() -> TableDataTypeUtils.stripRowPrefix(nonRowType, "prefix_"))
66+
.isInstanceOf(IllegalArgumentException.class)
67+
.hasMessageContaining(TableDataTypeUtils.STRIP_ROW_NO_ROW_ERROR_MSG);
68+
}
69+
70+
@Test
71+
public void testRenameRowFields() {
72+
List<RowType.RowField> fields = Arrays.asList(
73+
new RowType.RowField("oldName1", new VarCharType(), null),
74+
new RowType.RowField("oldName2", new IntType(), "description")
75+
);
76+
RowType rowType = new RowType(false, fields);
77+
78+
List<String> newFieldNames = Arrays.asList("newName1", "newName2");
79+
80+
RowType renamedType = TableDataTypeUtils.renameRowFields(rowType, newFieldNames);
81+
82+
List<String> resultFieldNames = renamedType.getFieldNames();
83+
assertThat(resultFieldNames).containsExactly("newName1", "newName2");
84+
85+
assertThat(renamedType.getFields().get(0).getType()).isInstanceOf(VarCharType.class);
86+
assertThat(renamedType.getFields().get(1).getType()).isInstanceOf(IntType.class);
87+
assertThat(renamedType.getFields().get(1).getDescription().orElse(null)).isEqualTo("description");
88+
}
89+
90+
@Test
91+
public void testRenameRowFieldsInvalidLength() {
92+
List<RowType.RowField> fields = Arrays.asList(
93+
new RowType.RowField("oldName1", new VarCharType(), null),
94+
new RowType.RowField("oldName2", new IntType(), null)
95+
);
96+
RowType rowType = new RowType(false, fields);
97+
98+
// Incorrect number of new field names
99+
List<String> newFieldNames = Collections.singletonList("newName1");
100+
101+
// Rename with incorrect number of fields should throw an exception
102+
assertThatThrownBy(() -> TableDataTypeUtils.renameRowFields(rowType, newFieldNames))
103+
.isInstanceOf(IllegalArgumentException.class)
104+
.hasMessageContaining(TableDataTypeUtils.RENAME_ROW_LENGTH_MISMATCH_ERROR_MSG);
105+
}
106+
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ under the License.
5353

5454
<!-- Main Dependencies -->
5555
<confluent.version>7.9.2</confluent.version>
56-
<flink.version>2.0.0</flink.version>
56+
<flink.version>2.1.0</flink.version>
5757
<kafka.version>3.9.1</kafka.version>
5858

5959
<!-- Other Dependencies -->

0 commit comments

Comments
 (0)