Skip to content

Commit a0b2d04

Browse files
committed
add: variant support
1 parent d55ee43 commit a0b2d04

File tree

14 files changed

+288
-62
lines changed

14 files changed

+288
-62
lines changed

docs/content.zh/docs/core-concept/type-mappings.md

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,24 @@ under the License.
3939

4040
## 完整类型列表
4141

42-
| CDC 数据类型 | CDC 内部类型 | Java 外部类型 |
43-
|--------------------------------|------------------------------------------------------------|---------------------------|
44-
| BOOLEAN | `java.lang.Boolean` | `java.lang.Boolean` |
45-
| TINYINT | `java.lang.Byte` | `java.lang.Byte` |
46-
| SMALLINT | `java.lang.Short` | `java.lang.Short` |
47-
| INTEGER | `java.lang.Integer` | `java.lang.Integer` |
48-
| BIGINT | `java.lang.Long` | `java.lang.Long` |
49-
| FLOAT | `java.lang.Float` | `java.lang.Float` |
50-
| DOUBLE | `java.lang.Double` | `java.lang.Double` |
51-
| DECIMAL | `org.apache.flink.cdc.common.data.DecimalData` | `java.math.BigDecimal` |
52-
| DATE | `org.apache.flink.cdc.common.data.DateData` | `java.time.LocalDate` |
53-
| TIME | `org.apache.flink.cdc.common.data.TimeData` | `java.time.LocalTime` |
54-
| TIMESTAMP | `org.apache.flink.cdc.common.data.TimestampData` | `java.time.LocalDateTime` |
55-
| TIMESTAMP_TZ | `org.apache.flink.cdc.common.data.ZonedTimestampData` | `java.time.ZonedDateTime` |
56-
| TIMESTAMP_LTZ | `org.apache.flink.cdc.common.data.LocalZonedTimestampData` | `java.time.Instant` |
57-
| CHAR<br/>VARCHAR<br/>STRING | `org.apache.flink.cdc.common.data.StringData` | `java.lang.String` |
58-
| BINARY<br/>VARBINARY<br/>BYTES | `byte[]` | `byte[]` |
59-
| ARRAY | `org.apache.flink.cdc.common.data.ArrayData` | `java.util.List<T>` |
60-
| MAP | `org.apache.flink.cdc.common.data.MapData` | `java.util.Map<K, V>` |
61-
| ROW | `org.apache.flink.cdc.common.data.RecordData` | `java.util.List<Object>` |
42+
| CDC 数据类型 | CDC 内部类型 | Java 外部类型 |
43+
|--------------------------------|------------------------------------------------------------|---------------------------------------------|
44+
| BOOLEAN | `java.lang.Boolean` | `java.lang.Boolean` |
45+
| TINYINT | `java.lang.Byte` | `java.lang.Byte` |
46+
| SMALLINT | `java.lang.Short` | `java.lang.Short` |
47+
| INTEGER | `java.lang.Integer` | `java.lang.Integer` |
48+
| BIGINT | `java.lang.Long` | `java.lang.Long` |
49+
| FLOAT | `java.lang.Float` | `java.lang.Float` |
50+
| DOUBLE | `java.lang.Double` | `java.lang.Double` |
51+
| DECIMAL | `org.apache.flink.cdc.common.data.DecimalData` | `java.math.BigDecimal` |
52+
| DATE | `org.apache.flink.cdc.common.data.DateData` | `java.time.LocalDate` |
53+
| TIME | `org.apache.flink.cdc.common.data.TimeData` | `java.time.LocalTime` |
54+
| TIMESTAMP | `org.apache.flink.cdc.common.data.TimestampData` | `java.time.LocalDateTime` |
55+
| TIMESTAMP_TZ | `org.apache.flink.cdc.common.data.ZonedTimestampData` | `java.time.ZonedDateTime` |
56+
| TIMESTAMP_LTZ | `org.apache.flink.cdc.common.data.LocalZonedTimestampData` | `java.time.Instant` |
57+
| CHAR<br/>VARCHAR<br/>STRING | `org.apache.flink.cdc.common.data.StringData` | `java.lang.String` |
58+
| BINARY<br/>VARBINARY<br/>BYTES | `byte[]` | `byte[]` |
59+
| ARRAY | `org.apache.flink.cdc.common.data.ArrayData` | `java.util.List<T>` |
60+
| MAP | `org.apache.flink.cdc.common.data.MapData` | `java.util.Map<K, V>` |
61+
| ROW | `org.apache.flink.cdc.common.data.RecordData` | `java.util.List<Object>` |
62+
| VARIANT | `org.apache.flink.cdc.common.types.Variant` | `org.apache.flink.cdc.common.types.Variant` |

docs/content/docs/core-concept/type-mappings.md

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,24 @@ If you're writing a UDF, its arguments and return value types should be defined
3939

4040
## Full Types List
4141

42-
| CDC Data Type | CDC Internal Type | External Java Class |
43-
|--------------------------------|------------------------------------------------------------|---------------------------|
44-
| BOOLEAN | `java.lang.Boolean` | `java.lang.Boolean` |
45-
| TINYINT | `java.lang.Byte` | `java.lang.Byte` |
46-
| SMALLINT | `java.lang.Short` | `java.lang.Short` |
47-
| INTEGER | `java.lang.Integer` | `java.lang.Integer` |
48-
| BIGINT | `java.lang.Long` | `java.lang.Long` |
49-
| FLOAT | `java.lang.Float` | `java.lang.Float` |
50-
| DOUBLE | `java.lang.Double` | `java.lang.Double` |
51-
| DECIMAL | `org.apache.flink.cdc.common.data.DecimalData` | `java.math.BigDecimal` |
52-
| DATE | `org.apache.flink.cdc.common.data.DateData` | `java.time.LocalDate` |
53-
| TIME | `org.apache.flink.cdc.common.data.TimeData` | `java.time.LocalTime` |
54-
| TIMESTAMP | `org.apache.flink.cdc.common.data.TimestampData` | `java.time.LocalDateTime` |
55-
| TIMESTAMP_TZ | `org.apache.flink.cdc.common.data.ZonedTimestampData` | `java.time.ZonedDateTime` |
56-
| TIMESTAMP_LTZ | `org.apache.flink.cdc.common.data.LocalZonedTimestampData` | `java.time.Instant` |
57-
| CHAR<br/>VARCHAR<br/>STRING | `org.apache.flink.cdc.common.data.StringData` | `java.lang.String` |
58-
| BINARY<br/>VARBINARY<br/>BYTES | `byte[]` | `byte[]` |
59-
| ARRAY | `org.apache.flink.cdc.common.data.ArrayData` | `java.util.List<T>` |
60-
| MAP | `org.apache.flink.cdc.common.data.MapData` | `java.util.Map<K, V>` |
61-
| ROW | `org.apache.flink.cdc.common.data.RecordData` | `java.util.List<Object>` |
42+
| CDC Data Type | CDC Internal Type | External Java Class |
43+
|--------------------------------|------------------------------------------------------------|----------------------------------------------------|
44+
| BOOLEAN | `java.lang.Boolean` | `java.lang.Boolean` |
45+
| TINYINT | `java.lang.Byte` | `java.lang.Byte` |
46+
| SMALLINT | `java.lang.Short` | `java.lang.Short` |
47+
| INTEGER | `java.lang.Integer` | `java.lang.Integer` |
48+
| BIGINT | `java.lang.Long` | `java.lang.Long` |
49+
| FLOAT | `java.lang.Float` | `java.lang.Float` |
50+
| DOUBLE | `java.lang.Double` | `java.lang.Double` |
51+
| DECIMAL | `org.apache.flink.cdc.common.data.DecimalData` | `java.math.BigDecimal` |
52+
| DATE | `org.apache.flink.cdc.common.data.DateData` | `java.time.LocalDate` |
53+
| TIME | `org.apache.flink.cdc.common.data.TimeData` | `java.time.LocalTime` |
54+
| TIMESTAMP | `org.apache.flink.cdc.common.data.TimestampData` | `java.time.LocalDateTime` |
55+
| TIMESTAMP_TZ | `org.apache.flink.cdc.common.data.ZonedTimestampData` | `java.time.ZonedDateTime` |
56+
| TIMESTAMP_LTZ | `org.apache.flink.cdc.common.data.LocalZonedTimestampData` | `java.time.Instant` |
57+
| CHAR<br/>VARCHAR<br/>STRING | `org.apache.flink.cdc.common.data.StringData` | `java.lang.String` |
58+
| BINARY<br/>VARBINARY<br/>BYTES | `byte[]` | `byte[]` |
59+
| ARRAY | `org.apache.flink.cdc.common.data.ArrayData` | `java.util.List<T>` |
60+
| MAP | `org.apache.flink.cdc.common.data.MapData` | `java.util.Map<K, V>` |
61+
| ROW | `org.apache.flink.cdc.common.data.RecordData` | `java.util.List<Object>` |
62+
| VARIANT | `org.apache.flink.cdc.common.types.Variant` | `org.apache.flink.cdc.common.types.Variant` |

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.cdc.common.types.DataType;
3636
import org.apache.flink.cdc.common.types.MapType;
3737
import org.apache.flink.cdc.common.types.RowType;
38+
import org.apache.flink.cdc.common.types.VariantType;
39+
import org.apache.flink.cdc.common.types.variant.Variant;
3840
import org.apache.flink.cdc.common.utils.Preconditions;
3941
import org.apache.flink.cdc.common.utils.SchemaUtils;
4042

@@ -122,6 +124,20 @@ static byte[] convertToBinary(Object obj) {
122124
"Cannot convert " + obj + " of type " + obj.getClass() + " to BINARY.");
123125
}
124126

127+
static Variant convertToVariant(Object obj, VariantType variantType) {
128+
if (obj instanceof Variant) {
129+
return (Variant) obj;
130+
}
131+
throw new RuntimeException(
132+
"Cannot convert "
133+
+ obj
134+
+ " of type "
135+
+ obj.getClass()
136+
+ " to Variant ("
137+
+ variantType
138+
+ ").");
139+
}
140+
125141
// ----------------------
126142
// These are converters to CDC Internal objects.
127143
// ----------------------

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import org.apache.flink.cdc.common.types.TinyIntType;
4949
import org.apache.flink.cdc.common.types.VarBinaryType;
5050
import org.apache.flink.cdc.common.types.VarCharType;
51+
import org.apache.flink.cdc.common.types.VariantType;
5152
import org.apache.flink.cdc.common.types.ZonedTimestampType;
53+
import org.apache.flink.cdc.common.types.variant.Variant;
5254

5355
/**
5456
* Converts a {@link org.apache.flink.cdc.common.types.DataType} to its CDC Internal representation.
@@ -164,4 +166,9 @@ public Class<?> visit(MapType mapType) {
164166
public Class<?> visit(RowType rowType) {
165167
return RecordData.class;
166168
}
169+
170+
@Override
171+
public Class<?> visit(VariantType variantType) {
172+
return Variant.class;
173+
}
167174
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.flink.cdc.common.types.TinyIntType;
4949
import org.apache.flink.cdc.common.types.VarBinaryType;
5050
import org.apache.flink.cdc.common.types.VarCharType;
51+
import org.apache.flink.cdc.common.types.VariantType;
5152
import org.apache.flink.cdc.common.types.ZonedTimestampType;
5253

5354
import java.util.function.Function;
@@ -159,6 +160,11 @@ public Function<Object, MapData> visit(MapType mapType) {
159160
public Function<Object, RecordData> visit(RowType rowType) {
160161
return o -> CommonConverter.convertToRowData(o, rowType);
161162
}
163+
164+
@Override
165+
public Function<Object, ?> visit(VariantType variantType) {
166+
return o -> CommonConverter.convertToVariant(o, variantType);
167+
}
162168
}
163169

164170
public static Object convertToInternal(Object obj, DataType dataType) {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import org.apache.flink.cdc.common.types.TinyIntType;
3939
import org.apache.flink.cdc.common.types.VarBinaryType;
4040
import org.apache.flink.cdc.common.types.VarCharType;
41+
import org.apache.flink.cdc.common.types.VariantType;
4142
import org.apache.flink.cdc.common.types.ZonedTimestampType;
43+
import org.apache.flink.cdc.common.types.variant.Variant;
4244

4345
import java.math.BigDecimal;
4446
import java.time.Instant;
@@ -164,4 +166,9 @@ public Class<?> visit(MapType mapType) {
164166
public Class<?> visit(RowType rowType) {
165167
return List.class;
166168
}
169+
170+
@Override
171+
public Class<?> visit(VariantType variantType) {
172+
return Variant.class;
173+
}
167174
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.cdc.common.types.TinyIntType;
3939
import org.apache.flink.cdc.common.types.VarBinaryType;
4040
import org.apache.flink.cdc.common.types.VarCharType;
41+
import org.apache.flink.cdc.common.types.VariantType;
4142
import org.apache.flink.cdc.common.types.ZonedTimestampType;
4243

4344
import java.math.BigDecimal;
@@ -155,6 +156,11 @@ public Function<Object, List<?>> visit(ArrayType arrayType) {
155156
public Function<Object, List<?>> visit(RowType rowType) {
156157
return o -> CommonConverter.convertToRow(o, rowType);
157158
}
159+
160+
@Override
161+
public Function<Object, ?> visit(VariantType variantType) {
162+
return o -> CommonConverter.convertToVariant(o, variantType);
163+
}
158164
}
159165

160166
public static Object convertToJava(Object obj, DataType dataType) {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.cdc.common.data;
2020

2121
import org.apache.flink.cdc.common.annotation.PublicEvolving;
22+
import org.apache.flink.cdc.common.types.variant.Variant;
2223

2324
import java.util.Arrays;
2425
import java.util.Objects;
@@ -181,6 +182,11 @@ public TimeData getTime(int pos) {
181182
return (TimeData) this.fields[pos];
182183
}
183184

185+
@Override
186+
public Variant getVariant(int pos) {
187+
return (Variant) this.fields[pos];
188+
}
189+
184190
@Override
185191
public boolean equals(Object o) {
186192
if (this == o) {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@
5959
import org.apache.flink.cdc.common.types.TinyIntType;
6060
import org.apache.flink.cdc.common.types.VarBinaryType;
6161
import org.apache.flink.cdc.common.types.VarCharType;
62+
import org.apache.flink.cdc.common.types.VariantType;
6263
import org.apache.flink.cdc.common.types.ZonedTimestampType;
64+
import org.apache.flink.cdc.common.types.variant.Variant;
6365

6466
import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
6567
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
@@ -613,7 +615,7 @@ public static int getNumericPrecision(DataType dataType) {
613615
}
614616

615617
@VisibleForTesting
616-
static Object coerceObject(
618+
public static Object coerceObject(
617619
String timezone,
618620
Object originalField,
619621
DataType originalType,
@@ -733,6 +735,10 @@ private static Object coerceToString(Object originalField, DataType originalType
733735
return BinaryStringData.fromString(hexlify((byte[]) originalField));
734736
}
735737

738+
if (originalField instanceof Variant) {
739+
return BinaryStringData.fromString(((Variant) originalField).toJson());
740+
}
741+
736742
return BinaryStringData.fromString(originalField.toString());
737743
}
738744

@@ -1046,6 +1052,7 @@ private static Map<Class<? extends DataType>, List<DataType>> getTypeMergingTree
10461052
mergingTree.put(RowType.class, ImmutableList.of(stringType));
10471053
mergingTree.put(ArrayType.class, ImmutableList.of(stringType));
10481054
mergingTree.put(MapType.class, ImmutableList.of(stringType));
1055+
mergingTree.put(VariantType.class, ImmutableList.of(stringType));
10491056
return mergingTree;
10501057
}
10511058
}

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ void testConvertingFullTypes() {
6060
DataTypes.ROW(
6161
DataTypes.FIELD("f1", DataTypes.STRING()),
6262
DataTypes.FIELD("f2", DataTypes.STRING(), "desc")),
63-
DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())))
63+
DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING()),
64+
DataTypes.VARIANT()))
6465
.map(InternalClassConverter::toInternalClass)
6566
.map(Class::getCanonicalName)
6667
.containsExactly(
@@ -90,6 +91,7 @@ void testConvertingFullTypes() {
9091
"org.apache.flink.cdc.common.data.ArrayData",
9192
"org.apache.flink.cdc.common.data.MapData",
9293
"org.apache.flink.cdc.common.data.RecordData",
93-
"org.apache.flink.cdc.common.data.RecordData");
94+
"org.apache.flink.cdc.common.data.RecordData",
95+
"org.apache.flink.cdc.common.types.variant.Variant");
9496
}
9597
}

0 commit comments

Comments
 (0)