Skip to content

Commit 7d9e1c6

Browse files
authored
[FLINK-38879][pipeline-connector][paimon] Add support for creating and writing table with variant type. (#4228)
1 parent dd6fa19 commit 7d9e1c6

File tree

7 files changed

+138
-35
lines changed

7 files changed

+138
-35
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@
3232
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
3333
import org.apache.flink.cdc.common.schema.Schema;
3434
import org.apache.flink.cdc.common.sink.MetadataApplier;
35-
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
35+
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
3636

3737
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
3838

3939
import org.apache.paimon.catalog.Catalog;
4040
import org.apache.paimon.catalog.Identifier;
4141
import org.apache.paimon.flink.FlinkCatalogFactory;
42-
import org.apache.paimon.flink.LogicalTypeConversion;
4342
import org.apache.paimon.options.Options;
4443
import org.apache.paimon.schema.SchemaChange;
4544
import org.apache.paimon.table.Table;
@@ -173,9 +172,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
173172
(column) ->
174173
builder.column(
175174
column.getName(),
176-
LogicalTypeConversion.toDataType(
177-
DataTypeUtils.toFlinkDataType(column.getType())
178-
.getLogicalType()),
175+
TypeUtils.toPaimonDataType(column.getType()),
179176
column.getComment()));
180177
List<String> partitionKeys = new ArrayList<>();
181178
List<String> primaryKeys = schema.primaryKeys();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
2424
import org.apache.flink.cdc.common.types.TimestampType;
2525
import org.apache.flink.cdc.common.types.ZonedTimestampType;
26-
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
26+
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
2727

28-
import org.apache.paimon.flink.LogicalTypeConversion;
2928
import org.apache.paimon.schema.SchemaChange;
3029

3130
import java.util.ArrayList;
@@ -54,10 +53,7 @@ public static List<SchemaChange> add(AddColumnEvent.ColumnWithPosition columnWit
5453
result.add(
5554
SchemaChange.addColumn(
5655
columnWithPosition.getAddColumn().getName(),
57-
LogicalTypeConversion.toDataType(
58-
DataTypeUtils.toFlinkDataType(
59-
columnWithPosition.getAddColumn().getType())
60-
.getLogicalType()),
56+
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
6157
columnWithPosition.getAddColumn().getComment()));
6258
// if default value express exists, we need to set the default value to the table
6359
// option
@@ -89,10 +85,7 @@ public static List<SchemaChange> add(
8985
result.add(
9086
SchemaChange.addColumn(
9187
columnWithPosition.getAddColumn().getName(),
92-
LogicalTypeConversion.toDataType(
93-
DataTypeUtils.toFlinkDataType(
94-
columnWithPosition.getAddColumn().getType())
95-
.getLogicalType()),
88+
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
9689
columnWithPosition.getAddColumn().getComment(),
9790
move));
9891
// if default value express exists, we need to set the default value to the table
@@ -118,10 +111,7 @@ public static List<SchemaChange> add(
118111
* @return A SchemaChange object representing the update of the column's data type.
119112
*/
120113
public static SchemaChange updateColumnType(String oldColumnName, DataType newType) {
121-
return SchemaChange.updateColumnType(
122-
oldColumnName,
123-
LogicalTypeConversion.toDataType(
124-
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
114+
return SchemaChange.updateColumnType(oldColumnName, TypeUtils.toPaimonDataType(newType));
125115
}
126116

127117
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink.utils;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypeRoot;
22+
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
23+
import org.apache.flink.table.types.utils.TypeConversions;
24+
25+
import org.apache.paimon.flink.LogicalTypeConversion;
26+
import org.apache.paimon.types.VariantType;
27+
28+
/** Utils for data type conversion between CDC and Paimon System. */
29+
public class TypeUtils {
30+
31+
/**
32+
* Convert Flink CDC DataType to Paimon DataType.
33+
*
34+
* @param dataType Flink CDC DataType
35+
* @return Paimon DataType
36+
*/
37+
public static org.apache.paimon.types.DataType toPaimonDataType(DataType dataType) {
38+
// TODO remove this branch after bumping Flink version to 2.2
39+
if (dataType.is(DataTypeRoot.VARIANT)) {
40+
return new VariantType(dataType.isNullable());
41+
} else {
42+
return LogicalTypeConversion.toDataType(
43+
DataTypeUtils.toFlinkDataType(dataType).getLogicalType());
44+
}
45+
}
46+
47+
/**
48+
* Convert Paimon DataType to Flink CDC DataType.
49+
*
50+
* @param dataType Paimon DataType
51+
* @return Flink CDC DataType
52+
*/
53+
public static DataType toCDCDataType(org.apache.paimon.types.DataType dataType) {
54+
// TODO remove this branch after bumping Flink version to 2.2
55+
if (dataType.is(org.apache.paimon.types.DataTypeRoot.VARIANT)) {
56+
return new org.apache.flink.cdc.common.types.VariantType(dataType.isNullable());
57+
} else {
58+
return DataTypeUtils.fromFlinkDataType(
59+
TypeConversions.fromLogicalToDataType(
60+
LogicalTypeConversion.toLogicalType(dataType)));
61+
}
62+
}
63+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
import org.apache.flink.cdc.common.types.DataType;
3232
import org.apache.flink.cdc.common.types.DataTypeChecks;
3333
import org.apache.flink.cdc.common.types.DataTypeRoot;
34-
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
34+
import org.apache.flink.cdc.common.types.variant.BinaryVariant;
35+
import org.apache.flink.cdc.common.utils.Preconditions;
36+
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
3537
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
3638
import org.apache.flink.core.memory.MemorySegment;
37-
import org.apache.flink.table.types.utils.TypeConversions;
3839

3940
import org.apache.paimon.catalog.Identifier;
4041
import org.apache.paimon.data.BinaryRow;
@@ -43,7 +44,7 @@
4344
import org.apache.paimon.data.GenericRow;
4445
import org.apache.paimon.data.InternalRow;
4546
import org.apache.paimon.data.Timestamp;
46-
import org.apache.paimon.flink.LogicalTypeConversion;
47+
import org.apache.paimon.data.variant.GenericVariant;
4748
import org.apache.paimon.memory.MemorySegmentUtils;
4849
import org.apache.paimon.table.Table;
4950
import org.apache.paimon.types.RowKind;
@@ -176,6 +177,20 @@ private static RecordData.FieldGetter createFieldGetter(
176177
case MAP:
177178
fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot());
178179
break;
180+
case VARIANT:
181+
fieldGetter =
182+
row -> {
183+
org.apache.flink.cdc.common.types.variant.Variant variant =
184+
row.getVariant(fieldPos);
185+
Preconditions.checkArgument(
186+
variant instanceof BinaryVariant,
187+
"Unsupported variant type: %s",
188+
variant.getClass());
189+
return new GenericVariant(
190+
((BinaryVariant) variant).getValue(),
191+
((BinaryVariant) variant).getMetadata());
192+
};
193+
break;
179194
default:
180195
throw new IllegalArgumentException(
181196
"don't support type of " + fieldType.getTypeRoot());
@@ -278,10 +293,7 @@ public static Schema deduceSchemaForPaimonTable(Table table) {
278293
column ->
279294
Column.physicalColumn(
280295
column.name(),
281-
DataTypeUtils.fromFlinkDataType(
282-
TypeConversions.fromLogicalToDataType(
283-
LogicalTypeConversion.toLogicalType(
284-
column.type()))),
296+
TypeUtils.toCDCDataType(column.type()),
285297
column.description()))
286298
.collect(Collectors.toList()));
287299
builder.primaryKey(table.primaryKeys());

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.cdc.common.sink.MetadataApplier;
2626
import org.apache.flink.cdc.common.types.DataType;
2727
import org.apache.flink.cdc.common.types.DataTypes;
28+
import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
2829
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
2930

3031
import org.apache.paimon.catalog.Catalog;
@@ -36,6 +37,7 @@
3637
import org.junit.jupiter.api.io.TempDir;
3738

3839
import java.io.File;
40+
import java.io.IOException;
3941
import java.nio.file.Path;
4042
import java.time.ZoneId;
4143
import java.util.HashMap;
@@ -72,7 +74,7 @@ public void afterEach() throws Exception {
7274
}
7375

7476
@Test
75-
public void testHashCodeForAppendOnlyTable() {
77+
public void testHashCodeForAppendOnlyTable() throws IOException {
7678
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
7779
Map<String, String> tableOptions = new HashMap<>();
7880
MetadataApplier metadataApplier =
@@ -82,6 +84,7 @@ public void testHashCodeForAppendOnlyTable() {
8284
.physicalColumn("col1", DataTypes.STRING().notNull())
8385
.physicalColumn("col2", DataTypes.STRING())
8486
.physicalColumn("pt", DataTypes.STRING())
87+
.physicalColumn("variantCol", DataTypes.VARIANT())
8588
.build();
8689
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
8790
metadataApplier.applySchemaChange(createTableEvent);
@@ -96,7 +99,9 @@ public void testHashCodeForAppendOnlyTable() {
9699
new Object[] {
97100
BinaryStringData.fromString("1"),
98101
BinaryStringData.fromString("1"),
99-
BinaryStringData.fromString("2024")
102+
BinaryStringData.fromString("2024"),
103+
BinaryVariantInternalBuilder.parseJson(
104+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
100105
}));
101106
int key1 = hashFunction.hashcode(dataChangeEvent1);
102107

@@ -107,7 +112,9 @@ public void testHashCodeForAppendOnlyTable() {
107112
new Object[] {
108113
BinaryStringData.fromString("2"),
109114
BinaryStringData.fromString("1"),
110-
BinaryStringData.fromString("2024")
115+
BinaryStringData.fromString("2024"),
116+
BinaryVariantInternalBuilder.parseJson(
117+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
111118
}));
112119
int key2 = hashFunction.hashcode(dataChangeEvent2);
113120

@@ -118,7 +125,9 @@ public void testHashCodeForAppendOnlyTable() {
118125
new Object[] {
119126
BinaryStringData.fromString("3"),
120127
BinaryStringData.fromString("1"),
121-
BinaryStringData.fromString("2024")
128+
BinaryStringData.fromString("2024"),
129+
BinaryVariantInternalBuilder.parseJson(
130+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
122131
}));
123132
int key3 = hashFunction.hashcode(dataChangeEvent3);
124133
assertThat(key1).isBetween(0, 3);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,26 @@ void testApplySchemaChange(String metastore)
189189
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
190190
.isEqualTo(tableSchema);
191191

192+
// Add column with variant type.
193+
addedColumns = new ArrayList<>();
194+
addedColumns.add(
195+
new AddColumnEvent.ColumnWithPosition(
196+
Column.physicalColumn(
197+
"variantCol",
198+
org.apache.flink.cdc.common.types.DataTypes.VARIANT(),
199+
null)));
200+
addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
201+
metadataApplier.applySchemaChange(addColumnEvent);
202+
tableSchema =
203+
new RowType(
204+
Arrays.asList(
205+
new DataField(0, "col1", DataTypes.STRING().notNull()),
206+
new DataField(
207+
2, "newcol3", DataTypes.STRING(), null, "col3DefValue"),
208+
new DataField(3, "variantCol", DataTypes.VARIANT(), null, null)));
209+
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
210+
.isEqualTo(tableSchema);
211+
192212
// Create table with partition column.
193213
createTableEvent =
194214
new CreateTableEvent(
@@ -412,6 +432,9 @@ void testCreateTableWithAllDataTypes(String metastore)
412432
"timestamp_ltz_with_precision",
413433
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(
414434
3))
435+
.physicalColumn(
436+
"variant",
437+
org.apache.flink.cdc.common.types.DataTypes.VARIANT())
415438
.primaryKey("col1")
416439
.build());
417440
metadataApplier.applySchemaChange(createTableEvent);
@@ -445,7 +468,8 @@ void testCreateTableWithAllDataTypes(String metastore)
445468
new DataField(
446469
20,
447470
"timestamp_ltz_with_precision",
448-
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))));
471+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
472+
new DataField(21, "variant", DataTypes.VARIANT())));
449473
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
450474
.isEqualTo(tableSchema);
451475
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.cdc.common.schema.Schema;
3434
import org.apache.flink.cdc.common.types.DataTypes;
3535
import org.apache.flink.cdc.common.types.RowType;
36+
import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
3637
import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
3738
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
3839
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -45,6 +46,7 @@
4546
import org.apache.paimon.data.InternalMap;
4647
import org.apache.paimon.data.NestedRow;
4748
import org.apache.paimon.data.Timestamp;
49+
import org.apache.paimon.data.variant.GenericVariant;
4850
import org.apache.paimon.flink.FlinkCatalogFactory;
4951
import org.apache.paimon.options.Options;
5052
import org.apache.paimon.table.Table;
@@ -54,6 +56,7 @@
5456
import org.junit.jupiter.api.io.TempDir;
5557

5658
import java.io.File;
59+
import java.io.IOException;
5760
import java.math.BigDecimal;
5861
import java.time.Instant;
5962
import java.time.ZoneId;
@@ -68,7 +71,7 @@ class PaimonWriterHelperTest {
6871
@TempDir public static java.nio.file.Path temporaryFolder;
6972

7073
@Test
71-
void testConvertEventToGenericRowOfAllDataTypes() {
74+
void testConvertEventToGenericRowOfAllDataTypes() throws IOException {
7275
RowType rowType =
7376
RowType.of(
7477
DataTypes.BOOLEAN(),
@@ -92,7 +95,8 @@ void testConvertEventToGenericRowOfAllDataTypes() {
9295
DataTypes.TIMESTAMP(3),
9396
DataTypes.TIMESTAMP_LTZ(),
9497
DataTypes.TIMESTAMP_LTZ(3),
95-
DataTypes.STRING());
98+
DataTypes.STRING(),
99+
DataTypes.VARIANT());
96100
Object[] testData =
97101
new Object[] {
98102
true,
@@ -117,7 +121,9 @@ void testConvertEventToGenericRowOfAllDataTypes() {
117121
TimestampData.fromTimestamp(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
118122
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
119123
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
120-
null
124+
null,
125+
BinaryVariantInternalBuilder.parseJson(
126+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
121127
};
122128
BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData);
123129
Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
@@ -154,7 +160,8 @@ void testConvertEventToGenericRowOfAllDataTypes() {
154160
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
155161
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
156162
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
157-
null));
163+
null,
164+
GenericVariant.fromJson("{\"a\":1,\"b\":\"hello\",\"c\":3.1}")));
158165
}
159166

160167
@Test
@@ -368,6 +375,7 @@ void testDeduceSchemaForPaimonTable() throws Catalog.TableNotExistException {
368375
.physicalColumn("timestamp_with_precision", DataTypes.TIMESTAMP(3))
369376
.physicalColumn("timestamp_ltz", DataTypes.TIMESTAMP_LTZ())
370377
.physicalColumn("timestamp_ltz_with_precision", DataTypes.TIMESTAMP_LTZ(3))
378+
.physicalColumn("variant", DataTypes.VARIANT())
371379
.primaryKey("col1")
372380
.build();
373381
CreateTableEvent createTableEvent =

0 commit comments

Comments
 (0)