diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java index d9475d77a..cb6d08d96 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java @@ -1,9 +1,11 @@ package org.apache.flink.connector.kafka.lineage; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.util.Objects; +import java.util.Optional; /** Default implementation of {@link KafkaDatasetFacet}. */ @PublicEvolving @@ -13,14 +15,26 @@ public class DefaultTypeDatasetFacet implements TypeDatasetFacet { private final TypeInformation typeInformation; + private final Optional serializationSchema; + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { this.typeInformation = typeInformation; + this.serializationSchema = Optional.empty(); + } + + public DefaultTypeDatasetFacet(SerializationSchema serializationSchema) { + this.serializationSchema = Optional.of(serializationSchema); + this.typeInformation = null; } public TypeInformation getTypeInformation() { return typeInformation; } + public Optional getSerializationSchema() { + return serializationSchema; + } + public boolean equals(Object o) { if (this == o) { return true; @@ -29,12 +43,13 @@ public boolean equals(Object o) { return false; } DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; - return Objects.equals(typeInformation, that.typeInformation); + return Objects.equals(typeInformation, that.typeInformation) + && Objects.equals(serializationSchema, that.serializationSchema); } @Override public int hashCode() { - return Objects.hash(typeInformation); + return Objects.hash(typeInformation, serializationSchema); } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java index 1e64f5819..f2cc9825c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java @@ -1,11 +1,26 @@ package org.apache.flink.connector.kafka.lineage; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import java.util.Optional; + /** Facet definition to contain type information of source and sink. */ @PublicEvolving public interface TypeDatasetFacet extends LineageDatasetFacet { TypeInformation getTypeInformation(); + + /** + * Sometimes a sink implementing {@link TypeDatasetFacetProvider} is not able to extract type. + * This is happening for AvroSerializationSchema due to type erasure problem. In this case, it + * makes sense to expose SerializationSchema to the lineage consumer so that it can use it to + * extract type information. + * + * @return + */ + default Optional getSerializationSchema() { + return Optional.empty(); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 1e24137e2..6e90e6c26 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -432,13 +433,17 @@ public Optional getTypeDatasetFacet() { ((ResultTypeQueryable) this.valueSerializationSchema) .getProducedType())); } else { - // gets type information from serialize method signature - Type type = - TypeExtractor.getParameterType( - SerializationSchema.class, valueSerializationSchema.getClass(), 0); try { + Type type = + TypeExtractor.getParameterType( + SerializationSchema.class, + valueSerializationSchema.getClass(), + 0); + return Optional.of( new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); + } catch (InvalidTypesException e) { + return Optional.of(new DefaultTypeDatasetFacet(valueSerializationSchema)); } catch (Exception e) { LOG.info( "Could not extract type information from {}", diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 77ddc87db..30ff58151 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -213,6 +213,10 @@ public LineageVertex getLineageVertex() { Optional typeDatasetFacet = Optional.empty(); if (recordSerializer instanceof TypeDatasetFacetProvider) { typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet(); + } else { + LOG.info( + "recordSerializer does not implement TypeDatasetFacetProvider: {}", + recordSerializer); } if (typeDatasetFacet.isPresent()) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index febfa533b..9af67f78c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -211,12 +212,14 @@ public Optional getTypeDatasetFacet() { new DefaultTypeDatasetFacet( ((ResultTypeQueryable) this.valueSerialization).getProducedType())); } else { - // gets type information from serialize method signature - Type type = - TypeExtractor.getParameterType( - SerializationSchema.class, valueSerialization.getClass(), 0); try { + Type type = + TypeExtractor.getParameterType( + SerializationSchema.class, valueSerialization.getClass(), 0); + return Optional.of(new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); + } catch (InvalidTypesException e) { + return Optional.of(new DefaultTypeDatasetFacet(valueSerialization)); } catch (Exception e) { LOG.info( "Could not extract type information from {}", diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 75feb292b..074badc36 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -400,6 +400,24 @@ public String apply(Object o) { .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO); } + @Test + public void testTypeDatasetFacetWithErasureProblem() { + SerializationSchema serializationSchema = element -> new byte[0]; + + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopic("some-topic") + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(new SimpleStringSchema()) + .build(); + + assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet()) + .isPresent() + .get() + .extracting(TypeDatasetFacet::getSerializationSchema) + .isEqualTo(Optional.of(serializationSchema)); + } + private static void assertOnlyOneSerializerAllowed( List< Function<