Skip to content

[FLINK-34466][LINEAGE] Support dataset type facet for Avro #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Objects;
import java.util.Optional;

/** Default implementation of {@link KafkaDatasetFacet}. */
@PublicEvolving
Expand All @@ -13,14 +15,26 @@ public class DefaultTypeDatasetFacet implements TypeDatasetFacet {

private final TypeInformation typeInformation;

private final Optional<SerializationSchema> serializationSchema;

public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
this.typeInformation = typeInformation;
this.serializationSchema = Optional.empty();
}

public DefaultTypeDatasetFacet(SerializationSchema serializationSchema) {
this.serializationSchema = Optional.of(serializationSchema);
this.typeInformation = null;
}

public TypeInformation getTypeInformation() {
return typeInformation;
}

public Optional<SerializationSchema> getSerializationSchema() {
return serializationSchema;
}

public boolean equals(Object o) {
if (this == o) {
return true;
Expand All @@ -29,12 +43,13 @@ public boolean equals(Object o) {
return false;
}
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
return Objects.equals(typeInformation, that.typeInformation);
return Objects.equals(typeInformation, that.typeInformation)
&& Objects.equals(serializationSchema, that.serializationSchema);
}

@Override
public int hashCode() {
return Objects.hash(typeInformation);
return Objects.hash(typeInformation, serializationSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Optional;

/** Facet definition to contain type information of source and sink. */
@PublicEvolving
public interface TypeDatasetFacet extends LineageDatasetFacet {
TypeInformation getTypeInformation();

/**
* Sometimes a sink implementing {@link TypeDatasetFacetProvider} is not able to extract type.
* This is happening for AvroSerializationSchema due to type erasure problem. In this case, it
* makes sense to expose SerializationSchema to the lineage consumer so that it can use it to
* extract type information.
*
* @return
*/
default Optional<SerializationSchema> getSerializationSchema() {
Copy link

@davidradl davidradl May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this interface live in Flink? And the logic to fallback to the serialization schema also be in Flink.

As I assume a File connector using the Avro format could hit this issue as well.

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.connector.kafka.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -432,13 +433,17 @@ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
((ResultTypeQueryable<?>) this.valueSerializationSchema)
.getProducedType()));
} else {
// gets type information from serialize method signature
Type type =
TypeExtractor.getParameterType(
SerializationSchema.class, valueSerializationSchema.getClass(), 0);
try {
Type type =
TypeExtractor.getParameterType(
SerializationSchema.class,
valueSerializationSchema.getClass(),
0);

return Optional.of(
new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
} catch (InvalidTypesException e) {
return Optional.of(new DefaultTypeDatasetFacet(valueSerializationSchema));
} catch (Exception e) {
LOG.info(
"Could not extract type information from {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ public LineageVertex getLineageVertex() {
Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
if (recordSerializer instanceof TypeDatasetFacetProvider) {
typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
} else {
LOG.info(
"recordSerializer does not implement TypeDatasetFacetProvider: {}",
recordSerializer);
}

if (typeDatasetFacet.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -211,12 +212,14 @@ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
new DefaultTypeDatasetFacet(
((ResultTypeQueryable<?>) this.valueSerialization).getProducedType()));
} else {
// gets type information from serialize method signature
Type type =
TypeExtractor.getParameterType(
SerializationSchema.class, valueSerialization.getClass(), 0);
try {
Type type =
TypeExtractor.getParameterType(
SerializationSchema.class, valueSerialization.getClass(), 0);

return Optional.of(new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
} catch (InvalidTypesException e) {
return Optional.of(new DefaultTypeDatasetFacet(valueSerialization));
} catch (Exception e) {
LOG.info(
"Could not extract type information from {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,24 @@ public String apply(Object o) {
.isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
}

@Test
public void testTypeDatasetFacetWithErasureProblem() {
SerializationSchema serializationSchema = element -> new byte[0];

KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopic("some-topic")
.setValueSerializationSchema(serializationSchema)
.setKeySerializationSchema(new SimpleStringSchema())
.build();

assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
.isPresent()
.get()
.extracting(TypeDatasetFacet::getSerializationSchema)
.isEqualTo(Optional.of(serializationSchema));
}

private static void assertOnlyOneSerializerAllowed(
List<
Function<
Expand Down