diff --git a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 index a4256dac4..51a60d6f8 100644 --- a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 +++ b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 @@ -23,8 +23,6 @@ Method calls method in (DynamicKafkaSourceReader.java:500) Method is annotated with in (ExactlyOnceKafkaWriter.java:0) Method is annotated with in (ExactlyOnceKafkaWriter.java:0) -Method is annotated with in (KafkaCommitter.java:0) -Method is annotated with in (KafkaCommitter.java:0) Method calls method in (KafkaSink.java:178) Method calls method in (KafkaSink.java:181) Method calls method in (KafkaSink.java:177) @@ -34,6 +32,8 @@ Method is annotated with in (KafkaSink.java:0) Method calls method in (KafkaSinkBuilder.java:154) Method is annotated with in (KafkaWriter.java:0) +Method is annotated with in (KafkaCommitter.java:0) +Method is annotated with in (KafkaCommitter.java:0) Method is annotated with in (ProducerPoolImpl.java:0) Method is annotated with in (KafkaSource.java:0) Method is annotated with in (KafkaSource.java:0) @@ -47,9 +47,9 @@ Method is annotated with in (KafkaPartitionSplitReader.java:0) Method is annotated with in (KafkaSourceReader.java:0) Method is annotated with in (KafkaSourceReader.java:0) +Method calls method in (Decoder.java:151) Method has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0) Method calls method in (KafkaConnectorOptionsUtil.java:520) Method calls method in (KafkaConnectorOptionsUtil.java:564) Method calls method in (KafkaDynamicSink.java:408) Method has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0) -Method calls method in (KafkaDynamicSource.java:574) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java new file mode 100644 index 000000000..807ce49e2 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.Internal; + +/** + * Projection pushdown mode for {@link + * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}. + */ +@Internal +public enum FormatProjectionPushdownLevel { + + /** The format does not support any kind of projection pushdown. */ + NONE, + + /** The format supports projection pushdown for top-level fields only. */ + TOP_LEVEL, + + /** The format supports projection pushdown for top-level and nested fields. */ + ALL +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Decoder.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Decoder.java new file mode 100644 index 000000000..b19d99b45 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Decoder.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource.Context; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Decoding messages consists of two potential steps: + * + *
    + *
  1. Deserialization i.e deserializing the {@code byte[]} into a {@link RowData}. This process + * is handled by a {@link DeserializationSchema}. + *
  2. Projection i.e. projecting any required fields from the deserialized {@link RowData} + * (returned by the {@link DeserializationSchema} in the first step) to their positions in the + * final produced {@link RowData}. This process is handled by a {@link Projector}. + *
+ * + *

In order to decode messages correctly, the {@link DeserializationSchema} and the {@link + * Projector} need to work together. For example, the {@link Projector} needs to know the positions + * of the required fields in the {@link RowData} returned by the {@link DeserializationSchema} in + * order to be able to correctly set fields in the final produced {@link RowData}. + * + *

That's why we have this {@link Decoder} class. This class ensures that the returned {@link + * DeserializationSchema} and {@link Projector} will work together to decode messages correctly. + */ +@Internal +public class Decoder { + + /** + * Can be null. Null is used inside {@link DynamicKafkaDeserializationSchema} to avoid + * deserializing keys if not required. + */ + private final @Nullable DeserializationSchema deserializationSchema; + + /** Mapping of the physical position in the key to the target position in the RowData. */ + private final Projector projector; + + private Decoder( + final DeserializationSchema deserializationSchema, final Projector projector) { + this.deserializationSchema = deserializationSchema; + this.projector = projector; + } + + /** + * @param decodingFormat Optional format for decoding bytes. + * @param tableDataType The data type representing the table schema. + * @param dataTypeProjection Indices indicate the position of the field in the dataType + * (key/value). Values indicate the position of the field in the tableSchema. + * @param prefix Optional field prefix + * @param projectedFields Indices indicate the position of the field in the produced Row. Values + * indicate the position of the field in the table schema. + * @param pushProjectionsIntoDecodingFormat if this is true and the format is a {@link + * ProjectableDecodingFormat}, any {@param projectedFields} will be pushed down into the + * {@link ProjectableDecodingFormat}. Otherwise, projections will be applied after + * deserialization. + * @return a {@link Decoder} instance. + */ + public static Decoder create( + final Context context, + final @Nullable DecodingFormat> decodingFormat, + final DataType tableDataType, + final int[] dataTypeProjection, + final @Nullable String prefix, + final int[][] projectedFields, + final List metadataKeys, + final boolean pushProjectionsIntoDecodingFormat) { + if (decodingFormat == null) { + return Decoder.noDeserializationOrProjection(); + } else { + if (decodingFormat instanceof ProjectableDecodingFormat + && pushProjectionsIntoDecodingFormat) { + return Decoder.projectInsideDeserializer( + context, + (ProjectableDecodingFormat>) decodingFormat, + tableDataType, + dataTypeProjection, + prefix, + projectedFields, + metadataKeys); + } else { + return Decoder.projectAfterDeserializing( + context, + decodingFormat, + tableDataType, + dataTypeProjection, + prefix, + projectedFields, + metadataKeys); + } + } + } + + /** @return a {@link DeserializationSchema} or null. */ + @Nullable + public DeserializationSchema getDeserializationSchema() { + return deserializationSchema; + } + + /** @return a {@link Projector}. */ + public Projector getProjector() { + return projector; + } + + private static Decoder noDeserializationOrProjection() { + return new Decoder(null, new ProjectorImpl(Collections.emptyMap())); + } + + private static DataType toDataType( + final DataType tableDataType, + final int[] dataTypeProjection, + final @Nullable String prefix) { + final DataType temp = Projection.of(dataTypeProjection).project(tableDataType); + return Optional.ofNullable(prefix) + .map(s -> DataTypeUtils.stripRowPrefix(temp, s)) + .orElse(temp); + } + + private static Map tableToDeserializedTopLevelPos( + final int[] dataTypeProjection) { + final HashMap tableToDeserializedPos = new HashMap<>(); + for (int i = 0; i < dataTypeProjection.length; i++) { + tableToDeserializedPos.put(dataTypeProjection[i], i); + } + return tableToDeserializedPos; + } + + private static int[] copyArray(final int[] arr) { + return Arrays.copyOf(arr, arr.length); + } + + private static void addMetadataProjections( + final DecodingFormat decodingFormat, + final int deserializedSize, + final int physicalSize, + final List requestedMetadataKeys, + final Map, Integer> deserializedToProducedPos) { + + if (!requestedMetadataKeys.isEmpty()) { + decodingFormat.applyReadableMetadata(requestedMetadataKeys); + + // project only requested metadata keys + for (int i = 0; i < requestedMetadataKeys.size(); i++) { + // metadata is always added to the end of the deserialized row by the DecodingFormat + final int deserializedPos = deserializedSize + i; + // we need to always add metadata to the end of the produced row + final int producePos = physicalSize + i; + deserializedToProducedPos.put( + Collections.singletonList(deserializedPos), producePos); + } + } + } + + /** + * This method generates a {@link Decoder} which pushes projections down directly into the + * {@link ProjectableDecodingFormat} which takes care of projecting the fields during the + * deserialization process itself. + */ + private static Decoder projectInsideDeserializer( + final Context context, + final ProjectableDecodingFormat> + projectableDecodingFormat, + final DataType tableDataType, + final int[] dataTypeProjection, + final @Nullable String prefix, + final int[][] projectedFields, + final List metadataKeys) { + final Map tableToDeserializedTopLevelPos = + tableToDeserializedTopLevelPos(dataTypeProjection); + + final List deserializerProjectedFields = new ArrayList<>(); + final Map, Integer> deserializedToProducedPos = new HashMap<>(); + for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) { + final int[] tablePos = projectedFields[producedPos]; + final int tableTopLevelPos = tablePos[0]; + + final Integer dataTypeTopLevelPos = + tableToDeserializedTopLevelPos.get(tableTopLevelPos); + if (dataTypeTopLevelPos != null) { + final int[] dataTypePos = copyArray(tablePos); + dataTypePos[0] = dataTypeTopLevelPos; + + deserializerProjectedFields.add(dataTypePos); + + final int deserializedPos = deserializerProjectedFields.size() - 1; + deserializedToProducedPos.put( + Collections.singletonList(deserializedPos), producedPos); + } + } + + addMetadataProjections( + projectableDecodingFormat, + deserializerProjectedFields.size(), + projectedFields.length, + metadataKeys, + deserializedToProducedPos); + + return new Decoder( + projectableDecodingFormat.createRuntimeDecoder( + context, + toDataType(tableDataType, dataTypeProjection, prefix), + deserializerProjectedFields.toArray( + new int[deserializerProjectedFields.size()][])), + new ProjectorImpl(deserializedToProducedPos)); + } + + /** + * This method generates a {@link Decoder} which deserializes the data fully using the {@link + * DecodingFormat} and then applies any projections afterward. + */ + private static Decoder projectAfterDeserializing( + final Context context, + final DecodingFormat> decodingFormat, + final DataType tableDataType, + final int[] dataTypeProjection, + final @Nullable String prefix, + final int[][] projectedFields, + final List metadataKeys) { + final DataType dataType = toDataType(tableDataType, dataTypeProjection, prefix); + final Map tableToDeserializedTopLevelPos = + tableToDeserializedTopLevelPos(dataTypeProjection); + + final Map, Integer> deserializedToProducedPos = new HashMap<>(); + for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) { + final int[] tablePos = projectedFields[producedPos]; + int tableTopLevelPos = tablePos[0]; + + final Integer deserializedTopLevelPos = + tableToDeserializedTopLevelPos.get(tableTopLevelPos); + if (deserializedTopLevelPos != null) { + final int[] deserializedPos = copyArray(tablePos); + deserializedPos[0] = deserializedTopLevelPos; + + deserializedToProducedPos.put( + Collections.unmodifiableList( + Arrays.stream(deserializedPos) + .boxed() + .collect(Collectors.toList())), + producedPos); + } + } + + addMetadataProjections( + decodingFormat, + dataTypeProjection.length, + projectedFields.length, + metadataKeys, + deserializedToProducedPos); + + return new Decoder( + decodingFormat.createRuntimeDecoder(context, dataType), + new ProjectorImpl(deserializedToProducedPos)); + } + + /** Projects fields from the deserialized row to their positions in the final produced row. */ + @Internal + public interface Projector extends Serializable { + /** Returns true if {@link #project} will not project any fields. */ + boolean isEmptyProjection(); + + /** + * Returns true if deserialized positions are different from the final produced row + * positions. + */ + boolean isProjectionNeeded(); + + /** Copies fields from the deserialized row to their final positions in the produced row. */ + void project(final RowData deserialized, final GenericRowData producedRow); + } + + private static class ProjectorImpl implements Projector { + + private final Map, Integer> deserializedToProducedPos; + private final boolean isProjectionNeeded; + + ProjectorImpl(final Map, Integer> deserializedToProducedPos) { + this.deserializedToProducedPos = deserializedToProducedPos; + this.isProjectionNeeded = + !deserializedToProducedPos.entrySet().stream() + .allMatch( + entry -> { + final List deserializedPos = entry.getKey(); + final List producedPos = + Collections.singletonList(entry.getValue()); + return Objects.equals(producedPos, deserializedPos); + }); + } + + @Override + public boolean isEmptyProjection() { + return deserializedToProducedPos.isEmpty(); + } + + @Override + public boolean isProjectionNeeded() { + return isProjectionNeeded; + } + + @Override + public void project(final RowData deserialized, final GenericRowData producedRow) { + this.deserializedToProducedPos.forEach( + (deserializedPos, targetPos) -> { + Object value = deserialized; + for (final Integer i : deserializedPos) { + value = ((GenericRowData) value).getField(i); + } + producedRow.setField(targetPos, value); + }); + } + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java index 2fa67bd4e..4805d2531 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -50,6 +50,8 @@ class DynamicKafkaDeserializationSchema implements KafkaRecordDeserializationSch private final boolean hasMetadata; + private final boolean hasValueProjection; + private final BufferingCollector keyCollector; private final OutputProjectionCollector outputCollector; @@ -61,27 +63,28 @@ class DynamicKafkaDeserializationSchema implements KafkaRecordDeserializationSch DynamicKafkaDeserializationSchema( int physicalArity, @Nullable DeserializationSchema keyDeserialization, - int[] keyProjection, + Decoder.Projector keyProjector, DeserializationSchema valueDeserialization, - int[] valueProjection, + Decoder.Projector valueProjector, boolean hasMetadata, MetadataConverter[] metadataConverters, TypeInformation producedTypeInfo, boolean upsertMode) { if (upsertMode) { Preconditions.checkArgument( - keyDeserialization != null && keyProjection.length > 0, + keyDeserialization != null && !keyProjector.isEmptyProjection(), "Key must be set in upsert mode for deserialization schema."); } this.keyDeserialization = keyDeserialization; this.valueDeserialization = valueDeserialization; this.hasMetadata = hasMetadata; + this.hasValueProjection = valueProjector.isProjectionNeeded(); this.keyCollector = new BufferingCollector(); this.outputCollector = new OutputProjectionCollector( physicalArity, - keyProjection, - valueProjection, + keyProjector, + valueProjector, metadataConverters, upsertMode); this.producedTypeInfo = producedTypeInfo; @@ -101,7 +104,7 @@ public void deserialize(ConsumerRecord record, Collector SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM; public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + private static final String FORMAT_PROJECTION_PUSHDOWN_LEVEL = + "format-projection-pushdown-level"; + private static final String PROJECTION_PUSHDOWN_LEVEL_DESCRIPTION = + "Controls what level of projections can be pushed down into the ProjectableDecodingFormat. Valid values are:\n" + + " - \"NONE\" i.e. no projections are pushed down into the ProjectableDecodingFormat. This is the default.\n" + + " - \"TOP_LEVEL\" i.e. top-level projections are pushed down into the ProjectableDecodingFormat.\n" + + " - \"ALL\" i.e. top-level and nested projections are pushed down into the ProjectableDecodingFormat.\n" + + "This config should be set carefully with respect to the Format. " + + "In particular, users should ensure the Format properly support the desired level of projection pushdown " + + "as otherwise it can lead to runtime exceptions or even incorrect results.\n" + + "For example:\n" + + " - avro format does not support projection pushdown properly.\n" + + " - csv format only supports top-level projection pushdown.\n" + + " - json format supports top-level and nested projection pushdown."; + + public static final ConfigOption KEY_PROJECTION_PUSHDOWN_LEVEL = + ConfigOptions.key("key." + FORMAT_PROJECTION_PUSHDOWN_LEVEL) + .enumType(FormatProjectionPushdownLevel.class) + .defaultValue(FormatProjectionPushdownLevel.NONE) + .withDescription(PROJECTION_PUSHDOWN_LEVEL_DESCRIPTION); + + public static final ConfigOption + VALUE_PROJECTION_PUSHDOWN_LEVEL = + ConfigOptions.key("value." + FORMAT_PROJECTION_PUSHDOWN_LEVEL) + .enumType(FormatProjectionPushdownLevel.class) + .defaultValue(FormatProjectionPushdownLevel.NONE) + .withDescription(PROJECTION_PUSHDOWN_LEVEL_DESCRIPTION); + // -------------------------------------------------------------------------------------------- // Kafka specific options // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index 79cd483c4..43c8f332f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -32,16 +32,17 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.FormatProjectionPushdownLevel; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; import org.apache.flink.table.data.GenericMapData; @@ -49,7 +50,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -73,13 +74,15 @@ import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; /** A version-agnostic Kafka {@link ScanTableSource}. */ @Internal public class KafkaDynamicSource - implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { + implements ScanTableSource, + SupportsReadingMetadata, + SupportsWatermarkPushDown, + SupportsProjectionPushDown { private static final String KAFKA_TRANSFORMATION = "kafka"; @@ -90,12 +93,18 @@ public class KafkaDynamicSource /** Data type that describes the final output of the source. */ protected DataType producedDataType; - /** Metadata that is appended at the end of a physical source row. */ + /** Value format metadata that is appended at the end of a physical source row. */ + protected List valueFormatMetadataKeys; + + /** Connector metadata that is appended at the end of a physical source row. */ protected List metadataKeys; /** Watermark strategy that is used to generate per-partition watermark. */ protected @Nullable WatermarkStrategy watermarkStrategy; + /** Field index paths of all fields that must be present in the physically produced data. */ + protected int[][] projectedFields; + // -------------------------------------------------------------------------------------------- // Format attributes // -------------------------------------------------------------------------------------------- @@ -111,15 +120,19 @@ public class KafkaDynamicSource /** Format for decoding values from Kafka. */ protected final DecodingFormat> valueDecodingFormat; - /** Indices that determine the key fields and the target position in the produced row. */ + /** Indices that determine the key fields and their position in the table schema. */ protected final int[] keyProjection; - /** Indices that determine the value fields and the target position in the produced row. */ + /** Indices that determine the value fields and their position in the table schema. */ protected final int[] valueProjection; /** Prefix that needs to be removed from fields when constructing the physical data type. */ protected final @Nullable String keyPrefix; + protected final FormatProjectionPushdownLevel keyFormatProjectionPushdownLevel; + + protected final FormatProjectionPushdownLevel valueFormatProjectionPushdownLevel; + // -------------------------------------------------------------------------------------------- // Kafka-specific attributes // -------------------------------------------------------------------------------------------- @@ -191,7 +204,9 @@ public KafkaDynamicSource( long boundedTimestampMillis, boolean upsertMode, String tableIdentifier, - @Nullable Integer parallelism) { + @Nullable Integer parallelism, + FormatProjectionPushdownLevel keyFormatProjectionPushdownLevel, + FormatProjectionPushdownLevel valueFormatProjectionPushdownLevel) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -205,10 +220,14 @@ public KafkaDynamicSource( this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null."); this.keyPrefix = keyPrefix; + this.keyFormatProjectionPushdownLevel = keyFormatProjectionPushdownLevel; + this.valueFormatProjectionPushdownLevel = valueFormatProjectionPushdownLevel; // Mutable attributes this.producedDataType = physicalDataType; + this.valueFormatMetadataKeys = Collections.emptyList(); this.metadataKeys = Collections.emptyList(); this.watermarkStrategy = null; + this.projectedFields = tableSchemaProjectedFields(physicalDataType); // Kafka-specific attributes Preconditions.checkArgument( (topics != null && topicPattern == null) @@ -241,17 +260,33 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { - final DeserializationSchema keyDeserialization = - createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); + final Decoder keyDecoder = + Decoder.create( + context, + keyDecodingFormat, + physicalDataType, + keyProjection, + keyPrefix, + projectedFields, + Collections.emptyList(), + pushProjectionsIntoDecodingFormat(keyFormatProjectionPushdownLevel)); - final DeserializationSchema valueDeserialization = - createDeserialization(context, valueDecodingFormat, valueProjection, null); + final Decoder valueDecoder = + Decoder.create( + context, + valueDecodingFormat, + physicalDataType, + valueProjection, + null, + projectedFields, + valueFormatMetadataKeys, + pushProjectionsIntoDecodingFormat(valueFormatProjectionPushdownLevel)); final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); final KafkaSource kafkaSource = - createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo); + createKafkaSource(keyDecoder, valueDecoder, producedTypeInfo); return new DataStreamScanProvider() { @Override @@ -301,31 +336,23 @@ public Map listReadableMetadata() { @Override public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { - // separate connector and format metadata - final List formatMetadataKeys = - metadataKeys.stream() - .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) - .collect(Collectors.toList()); - final List connectorMetadataKeys = new ArrayList<>(metadataKeys); - connectorMetadataKeys.removeAll(formatMetadataKeys); - - // push down format metadata - final Map formatMetadata = valueDecodingFormat.listReadableMetadata(); - if (formatMetadata.size() > 0) { - final List requestedFormatMetadataKeys = - formatMetadataKeys.stream() - .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) - .collect(Collectors.toList()); - valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + this.valueFormatMetadataKeys = new ArrayList<>(); + this.metadataKeys = new ArrayList<>(); + for (final String key : metadataKeys) { + if (key.startsWith(VALUE_METADATA_PREFIX)) { + final String formatMetadataKey = key.substring(VALUE_METADATA_PREFIX.length()); + this.valueFormatMetadataKeys.add(formatMetadataKey); + } else { + this.metadataKeys.add(key); + } } - - this.metadataKeys = connectorMetadataKeys; this.producedDataType = producedDataType; } @Override public boolean supportsMetadataProjection() { - return false; + throw new IllegalStateException( + "This should never be called as KafkaDynamicSource implements the SupportsProjectionPushdown interface."); } @Override @@ -333,6 +360,18 @@ public void applyWatermark(WatermarkStrategy watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } + @Override + public boolean supportsNestedProjection() { + return (keyDecodingFormat == null + || keyFormatProjectionPushdownLevel == FormatProjectionPushdownLevel.ALL) + && valueFormatProjectionPushdownLevel == FormatProjectionPushdownLevel.ALL; + } + + @Override + public void applyProjection(final int[][] projectedFields, final DataType producedDataType) { + this.projectedFields = projectedFields; + } + @Override public DynamicTableSource copy() { final KafkaDynamicSource copy = @@ -354,10 +393,14 @@ public DynamicTableSource copy() { boundedTimestampMillis, upsertMode, tableIdentifier, - parallelism); + parallelism, + keyFormatProjectionPushdownLevel, + valueFormatProjectionPushdownLevel); copy.producedDataType = producedDataType; + copy.valueFormatMetadataKeys = valueFormatMetadataKeys; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; + copy.projectedFields = projectedFields; return copy; } @@ -376,6 +419,7 @@ public boolean equals(Object o) { } final KafkaDynamicSource that = (KafkaDynamicSource) o; return Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(valueFormatMetadataKeys, that.valueFormatMetadataKeys) && Objects.equals(metadataKeys, that.metadataKeys) && Objects.equals(physicalDataType, that.physicalDataType) && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) @@ -395,13 +439,20 @@ public boolean equals(Object o) { && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(watermarkStrategy, that.watermarkStrategy) - && Objects.equals(parallelism, that.parallelism); + && Objects.equals(parallelism, that.parallelism) + && Arrays.deepEquals(projectedFields, that.projectedFields) + && Objects.equals( + keyFormatProjectionPushdownLevel, that.keyFormatProjectionPushdownLevel) + && Objects.equals( + valueFormatProjectionPushdownLevel, + that.valueFormatProjectionPushdownLevel); } @Override public int hashCode() { return Objects.hash( producedDataType, + valueFormatMetadataKeys, metadataKeys, physicalDataType, keyDecodingFormat, @@ -421,19 +472,43 @@ public int hashCode() { upsertMode, tableIdentifier, watermarkStrategy, - parallelism); + parallelism, + Arrays.deepHashCode(projectedFields), + keyFormatProjectionPushdownLevel, + valueFormatProjectionPushdownLevel); } // -------------------------------------------------------------------------------------------- + private static int[][] tableSchemaProjectedFields(final DataType tableDataType) { + final RowType rowType = (RowType) tableDataType.getLogicalType(); + final int tableSchemaSize = rowType.getFieldCount(); + final int[][] projectedFields = new int[tableSchemaSize][]; + for (int i = 0; i < tableSchemaSize; i++) { + projectedFields[i] = new int[] {i}; + } + return projectedFields; + } + + private boolean pushProjectionsIntoDecodingFormat( + final FormatProjectionPushdownLevel formatProjectionPushdownLevel) { + switch (formatProjectionPushdownLevel) { + case NONE: + return false; + case TOP_LEVEL: + case ALL: + return true; + default: + throw new IllegalArgumentException( + "Unsupported projection pushdown level: " + formatProjectionPushdownLevel); + } + } + protected KafkaSource createKafkaSource( - DeserializationSchema keyDeserialization, - DeserializationSchema valueDeserialization, - TypeInformation producedTypeInfo) { + Decoder keyDecoder, Decoder valueDecoder, TypeInformation producedTypeInfo) { final KafkaRecordDeserializationSchema kafkaDeserializer = - createKafkaDeserializationSchema( - keyDeserialization, valueDeserialization, producedTypeInfo); + createKafkaDeserializationSchema(keyDecoder, valueDecoder, producedTypeInfo); final KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); @@ -519,9 +594,7 @@ private OffsetResetStrategy getResetStrategy(String offsetResetConfig) { } private KafkaRecordDeserializationSchema createKafkaDeserializationSchema( - DeserializationSchema keyDeserialization, - DeserializationSchema valueDeserialization, - TypeInformation producedTypeInfo) { + Decoder keyDecoder, Decoder valueDecoder, TypeInformation producedTypeInfo) { final MetadataConverter[] metadataConverters = metadataKeys.stream() .map( @@ -540,42 +613,18 @@ private KafkaRecordDeserializationSchema createKafkaDeserializationSche final int adjustedPhysicalArity = DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size(); - // adjust value format projection to include value format's metadata columns at the end - final int[] adjustedValueProjection = - IntStream.concat( - IntStream.of(valueProjection), - IntStream.range( - keyProjection.length + valueProjection.length, - adjustedPhysicalArity)) - .toArray(); - return new DynamicKafkaDeserializationSchema( adjustedPhysicalArity, - keyDeserialization, - keyProjection, - valueDeserialization, - adjustedValueProjection, + keyDecoder.getDeserializationSchema(), + keyDecoder.getProjector(), + valueDecoder.getDeserializationSchema(), + valueDecoder.getProjector(), hasMetadata, metadataConverters, producedTypeInfo, upsertMode); } - private @Nullable DeserializationSchema createDeserialization( - DynamicTableSource.Context context, - @Nullable DecodingFormat> format, - int[] projection, - @Nullable String prefix) { - if (format == null) { - return null; - } - DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); - if (prefix != null) { - physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); - } - return format.createRuntimeDecoder(context, physicalFormatDataType); - } - // -------------------------------------------------------------------------------------------- // Metadata handling // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 1446637eb..ac61bd6ab 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -30,6 +30,7 @@ import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.FormatProjectionPushdownLevel; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; @@ -70,6 +71,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_PROJECTION_PUSHDOWN_LEVEL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; @@ -88,6 +90,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_PROJECTION_PUSHDOWN_LEVEL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.StartupOptions; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; @@ -138,6 +141,8 @@ public Set> optionalOptions() { options.add(KEY_FORMAT); options.add(KEY_FIELDS); options.add(KEY_FIELDS_PREFIX); + options.add(KEY_PROJECTION_PUSHDOWN_LEVEL); + options.add(VALUE_PROJECTION_PUSHDOWN_LEVEL); options.add(VALUE_FORMAT); options.add(VALUE_FIELDS_INCLUDE); options.add(TOPIC); @@ -224,6 +229,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); + final FormatProjectionPushdownLevel keyFormatProjectionPushdownLevel = + tableOptions.get(KEY_PROJECTION_PUSHDOWN_LEVEL); + + final FormatProjectionPushdownLevel valueFormatProjectionPushdownLevel = + tableOptions.get(VALUE_PROJECTION_PUSHDOWN_LEVEL); + return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -241,7 +252,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), - parallelism); + parallelism, + keyFormatProjectionPushdownLevel, + valueFormatProjectionPushdownLevel); } @Override @@ -408,7 +421,9 @@ protected KafkaDynamicSource createKafkaTableSource( Map specificEndOffsets, long endTimestampMillis, String tableIdentifier, - Integer parallelism) { + Integer parallelism, + FormatProjectionPushdownLevel keyFormatProjectionPushdownLevel, + FormatProjectionPushdownLevel valueFormatProjectionPushdownLevel) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -427,7 +442,9 @@ protected KafkaDynamicSource createKafkaTableSource( endTimestampMillis, false, tableIdentifier, - parallelism); + parallelism, + keyFormatProjectionPushdownLevel, + valueFormatProjectionPushdownLevel); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index c421c4ef0..879c7e23d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -58,6 +58,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_PROJECTION_PUSHDOWN_LEVEL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; @@ -72,6 +73,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_PROJECTION_PUSHDOWN_LEVEL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; @@ -175,7 +177,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedTimestampMillis, true, context.getObjectIdentifier().asSummaryString(), - parallelism); + parallelism, + tableOptions.get(KEY_PROJECTION_PUSHDOWN_LEVEL), + tableOptions.get(VALUE_PROJECTION_PUSHDOWN_LEVEL)); } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DecoderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DecoderTest.java new file mode 100644 index 000000000..35a1a9635 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DecoderTest.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.testutils.SimpleCollector; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatFactory; +import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Decoder}. */ +class DecoderTest { + + private static ProjectableDecodingFormat> jsonDecodingFormat() { + return (ProjectableDecodingFormat>) + new JsonFormatFactory().createDecodingFormat(null, new Configuration()); + } + + private static ProjectableDecodingFormat> + debeziumJsonDecodingFormat() { + return new DebeziumJsonDecodingFormat(false, false, TimestampFormat.ISO_8601); + } + + private static class NonProjectable implements DecodingFormat { + + private final ProjectableDecodingFormat underlying; + + NonProjectable(final ProjectableDecodingFormat underlying) { + this.underlying = underlying; + } + + @Override + public T createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + return underlying.createRuntimeDecoder(context, physicalDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return underlying.getChangelogMode(); + } + + @Override + public Map listReadableMetadata() { + return underlying.listReadableMetadata(); + } + + @Override + public void applyReadableMetadata(List metadataKeys) { + underlying.applyReadableMetadata(metadataKeys); + } + } + + private static class RecordingFormat implements DecodingFormat { + + private final DecodingFormat underlying; + + private List appliedMetadataKeys = null; + + RecordingFormat(final DecodingFormat underlying) { + this.underlying = underlying; + } + + @Override + public T createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + return underlying.createRuntimeDecoder(context, physicalDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return underlying.getChangelogMode(); + } + + @Override + public Map listReadableMetadata() { + return underlying.listReadableMetadata(); + } + + @Override + public void applyReadableMetadata(List metadataKeys) { + this.appliedMetadataKeys = metadataKeys; + underlying.applyReadableMetadata(metadataKeys); + } + } + + private static Stream jsonTestCases() { + return Stream.of( + Arguments.of( + "Projectable DecodingFormat with pushdown enabled", + (Supplier>>) + DecoderTest::jsonDecodingFormat, + true), + Arguments.of( + "Projectable DecodingFormat with pushdown disabled", + (Supplier>>) + DecoderTest::jsonDecodingFormat, + false), + Arguments.of( + "Non-projectable DecodingFormat", + (Supplier>>) + () -> new NonProjectable<>(jsonDecodingFormat()), + false)); + } + + // debezium-json format exposes some metadata fields (unlike json format) + private static Stream debeziumJsonTestCases() { + return Stream.of( + Arguments.of( + "Projectable DecodingFormat with pushdown enabled", + (Supplier>>) + DecoderTest::debeziumJsonDecodingFormat, + true), + Arguments.of( + "Projectable DecodingFormat with pushdown disabled", + (Supplier>>) + DecoderTest::debeziumJsonDecodingFormat, + false), + Arguments.of( + "Non-projectable DecodingFormat", + (Supplier>>) + () -> new NonProjectable<>(debeziumJsonDecodingFormat()), + false)); + } + + private static final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.STRING()), // key field + DataTypes.FIELD("b", DataTypes.STRING()), // value field + DataTypes.FIELD("c", DataTypes.STRING()), // key field + DataTypes.FIELD("d", DataTypes.STRING()), // value field + DataTypes.FIELD("e", DataTypes.STRING()), // key field + DataTypes.FIELD("f", DataTypes.STRING()) // value field + ); + + private static class MockContext implements DynamicTableSource.Context { + @Override + public TypeInformation createTypeInformation(DataType producedDataType) { + return InternalTypeInfo.of(producedDataType.getLogicalType()); + } + + @Override + public TypeInformation createTypeInformation(LogicalType producedLogicalType) { + return InternalTypeInfo.of(producedLogicalType); + } + + @Override + public DynamicTableSource.DataStructureConverter createDataStructureConverter( + DataType producedDataType) { + throw new UnsupportedOperationException(); + } + } + + private static void produceRow( + final Decoder decoder, final String input, final GenericRowData producedRow) { + // decode + final RowData deserialized; + try { + final DeserializationSchema deserializationSchema = + decoder.getDeserializationSchema(); + deserializationSchema.open(null); + final SimpleCollector collector = new SimpleCollector<>(); + deserializationSchema.deserialize(input.getBytes(), collector); + deserialized = collector.getList().get(0); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // project + decoder.getProjector().project(deserialized, producedRow); + } + + @Test + void testNoDecoder() { + final Decoder decoder = + Decoder.create( + new MockContext(), + null, + tableDataType, + new int[] {}, + null, + new int[][] {}, + Collections.emptyList(), + false); + + assertThat(decoder.getDeserializationSchema()).isNull(); + assertThat(decoder.getProjector().isProjectionNeeded()).isFalse(); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testProjectToReorderedTableSchema( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final int[] dataTypeProjection = new int[] {0, 2, 4}; + // Reverse order of table schema + final int[][] projectedFields = + new int[][] { + new int[] {5}, + new int[] {4}, + new int[] {3}, + new int[] {2}, + new int[] {1}, + new int[] {0}, + }; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + null, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final String input = "{\"a\":\"a\", \"c\":\"c\", \"e\":\"e\"}"; + + final int expectedArity = 6; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(1, "e"); + expected.setField(3, "c"); + expected.setField(5, "a"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testProjectToSubsetOfTableSchema( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final int[] dataTypeProjection = new int[] {0, 2, 4}; + final int[][] projectedFields = + new int[][] { + new int[] {0}, new int[] {1}, new int[] {2}, + }; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + null, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final String input = "{\"a\":\"a\", \"c\":\"c\", \"e\":\"e\"}"; + + final int expectedArity = 3; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "a"); + expected.setField(2, "c"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testProjectToReorderedSubsetOfTableSchema( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final int[] dataTypeProjection = new int[] {0, 2, 4}; + final int[][] projectedFields = + new int[][] { + new int[] {2}, new int[] {0}, new int[] {1}, + }; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + null, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final String input = "{\"a\":\"a\", \"c\":\"c\", \"e\":\"e\"}"; + + final int expectedArity = 3; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "c"); + expected.setField(1, "a"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testPrefix( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final String prefix = "k_"; + final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD(prefix + "a", DataTypes.STRING()), // key field + DataTypes.FIELD(prefix + "b", DataTypes.STRING()), // key field + DataTypes.FIELD("a", DataTypes.STRING()), // value field + DataTypes.FIELD("b", DataTypes.STRING()) // value field + ); + + final int[] dataTypeProjection = new int[] {0, 1}; + final String input = "{\"a\":\"a\", \"b\":\"b\"}"; + + final int[][] projectedFields = new int[][] {new int[] {1}}; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + prefix, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final int expectedArity = 6; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "b"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testProjectNestedFields( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD( + "a0", + DataTypes.ROW( + DataTypes.FIELD("a1", DataTypes.STRING()), + DataTypes.FIELD("b1", DataTypes.STRING()))), + DataTypes.FIELD( + "b0", + DataTypes.ROW( + DataTypes.FIELD("a1", DataTypes.STRING()), + DataTypes.FIELD("b1", DataTypes.STRING())))); + + final int[] dataTypeProjection = new int[] {0, 1}; + final String input = + "{" + + "\"a0\":{\"a1\":\"a0_a1\", \"b1\":\"a0_b1\"}, " + + "\"b0\":{\"a1\":\"b0_a1\", \"b1\":\"b0_b1\"}" + + "}"; + + final int[][] projectedFields = + new int[][] { + new int[] {0, 1}, // a0.b1 + new int[] {1, 0} // b0.a1 + }; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + null, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final int expectedArity = 2; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "a0_b1"); + expected.setField(1, "b0_a1"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testProjectDeeplyNestedFields( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD( + "a0", + DataTypes.ROW( + DataTypes.FIELD( + "a1", + DataTypes.ROW( + DataTypes.FIELD("a2", DataTypes.STRING()), + DataTypes.FIELD("b2", DataTypes.STRING()))), + DataTypes.FIELD( + "b1", + DataTypes.ROW( + DataTypes.FIELD("a2", DataTypes.STRING()), + DataTypes.FIELD( + "b2", DataTypes.STRING()))))), + DataTypes.FIELD( + "b0", + DataTypes.ROW( + DataTypes.FIELD( + "a1", + DataTypes.ROW( + DataTypes.FIELD("a2", DataTypes.STRING()), + DataTypes.FIELD("b2", DataTypes.STRING()))), + DataTypes.FIELD( + "b1", + DataTypes.ROW( + DataTypes.FIELD("a2", DataTypes.STRING()), + DataTypes.FIELD( + "b2", DataTypes.STRING())))))); + + final int[] dataTypeProjection = new int[] {0, 1}; + final String input = + "{\n" + + " \"a0\": {\n" + + " \"a1\": {\n" + + " \"a2\": \"a0_a1_a2\",\n" + + " \"b2\": \"a0_a1_b2\"\n" + + " },\n" + + " \"b1\": {\n" + + " \"a2\": \"a0_b1_a2\",\n" + + " \"b2\": \"a0_b1_b2\"\n" + + " }\n" + + " },\n" + + " \"b0\": {\n" + + " \"a1\": {\n" + + " \"a2\": \"b0_a1_a2\",\n" + + " \"b2\": \"b0_a1_b2\"\n" + + " },\n" + + " \"b1\": {\n" + + " \"a2\": \"b0_b1_a2\",\n" + + " \"b2\": \"b0_b1_b2\"\n" + + " }\n" + + " }\n" + + "}"; + + final int[][] projectedFields = + new int[][] { + new int[] {0, 1, 1}, // a0.b1.b2 + new int[] {1, 0, 0} // b0.a1.a2 + }; + + final Decoder decoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + dataTypeProjection, + null, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final int expectedArity = 2; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "a0_b1_b2"); + expected.setField(1, "b0_a1_a2"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("debeziumJsonTestCases") + void testProjectOneMetadataField( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final RecordingFormat> recordingFormat = + new RecordingFormat<>(formatSupplier.get()); + + final DataType tableDataType = DataTypes.ROW(DataTypes.FIELD("a", DataTypes.STRING())); + + final int[] dataTypeProjection = new int[] {0}; + final String input = + "{" + + "\"before\":null," + + "\"after\":{\"a\":\"a\"}," + + "\"op\":\"c\"," + + "\"ts_ms\":0" + + "}"; + + final int[][] projectedFields = new int[][] {new int[] {0}}; + + final List metadataKeys = Collections.singletonList("ingestion-timestamp"); + + final Decoder decoder = + Decoder.create( + new MockContext(), + recordingFormat, + tableDataType, + dataTypeProjection, + null, + projectedFields, + metadataKeys, + isPushdownEnabled); + + assertThat(recordingFormat.appliedMetadataKeys).isEqualTo(metadataKeys); + + final int expectedArity = 2; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "a"); + expected.setField(1, TimestampData.fromEpochMillis(0)); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("debeziumJsonTestCases") + void testProjectTwoMetadataFields( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final RecordingFormat> recordingFormat = + new RecordingFormat<>(formatSupplier.get()); + + final DataType tableDataType = DataTypes.ROW(DataTypes.FIELD("a", DataTypes.STRING())); + + final int[] dataTypeProjection = new int[] {0}; + final String input = + "{" + + "\"before\":null," + + "\"after\":{\"a\":\"a\"}," + + "\"source\":{\"version\":\"1.1.1.Final\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":0,\"snapshot\":\"true\",\"db\":\"inventory\",\"table\":\"products\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\",\"pos\":154,\"row\":0,\"thread\":null,\"query\":null}," + + "\"op\":\"c\"," + + "\"ts_ms\":0" + + "}"; + + final int[][] projectedFields = new int[][] {new int[] {0}}; + + final List metadataKeys = + Collections.unmodifiableList( + Arrays.asList("ingestion-timestamp", "source.database")); + + final Decoder decoder = + Decoder.create( + new MockContext(), + recordingFormat, + tableDataType, + dataTypeProjection, + null, + projectedFields, + metadataKeys, + isPushdownEnabled); + + assertThat(recordingFormat.appliedMetadataKeys).isEqualTo(metadataKeys); + + final int expectedArity = 3; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(decoder, input, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, "a"); + expected.setField(1, TimestampData.fromEpochMillis(0)); + expected.setField(2, "inventory"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testPrefixProjectToReorderedSubsetOfTableSchema( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> format = formatSupplier.get(); + + // key fields are prefix + // fields are in different order in the key/value as compared to the table schema + // projected fields both reorders and subsets the table schema + + final String keyPrefix = "key_"; + final String valuePrefix = null; + + final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD(keyPrefix + "a", DataTypes.STRING()), + DataTypes.FIELD("a", DataTypes.STRING()), + DataTypes.FIELD(keyPrefix + "b", DataTypes.INT()), + DataTypes.FIELD(keyPrefix + "c", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.STRING())); + + final int[] keyProjection = new int[] {3, 0, 2}; + final String keyInput = "{\"c\":\"k_c\", \"a\":\"k_a\", \"b\":1}"; + + final int[] valueProjection = new int[] {1, 4}; + final String valueInput = "{\"a\":\"v_a\", \"c\":\"v_c\"}"; + + final int[][] projectedFields = new int[][] {new int[] {2}, new int[] {1}}; + + final Decoder keyDecoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + keyProjection, + keyPrefix, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final Decoder valueDecoder = + Decoder.create( + new MockContext(), + format, + tableDataType, + valueProjection, + valuePrefix, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final int expectedArity = 2; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(keyDecoder, keyInput, producedRow); + produceRow(valueDecoder, valueInput, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, 1); + expected.setField(1, "v_a"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("jsonTestCases") + void testPrefixProjectToReorderedSubsetOfNestedTableSchema( + final String name, + final Supplier>> formatSupplier, + final Boolean isPushdownEnabled) { + final DecodingFormat> keyFormat = formatSupplier.get(); + final DecodingFormat> valueFormat = formatSupplier.get(); + + // key fields are prefix + // fields are in different order in the key/value as compared to the table schema + // projected fields both reorders, subsets, and un-nests the table schema + + final String keyPrefix = "key_"; + final String valuePrefix = null; + + final DataType tableDataType = + DataTypes.ROW( + DataTypes.FIELD(keyPrefix + "a", DataTypes.STRING()), + DataTypes.FIELD("a", DataTypes.STRING()), + DataTypes.FIELD(keyPrefix + "b", DataTypes.INT()), + DataTypes.FIELD(keyPrefix + "c", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.STRING()), + DataTypes.FIELD( + "d", + DataTypes.ROW( + DataTypes.FIELD("e", DataTypes.STRING()), + DataTypes.FIELD("f", DataTypes.STRING().nullable())))); + + final int[] keyProjection = new int[] {3, 0, 2}; + final String keyInput = "{\"c\":\"k_c\", \"a\":\"k_a\", \"b\":1}"; + + final int[] valueProjection = new int[] {1, 4, 5}; + final String valueInput = "{\"a\":\"v_a\", \"c\":\"v_c\", \"d\":{\"e\":\"v_e\"}}"; + + final int[][] projectedFields = + new int[][] { + new int[] {2}, // k_b + new int[] {1}, // a + new int[] {5, 0} // d.e + }; + + final Decoder keyDecoder = + Decoder.create( + new MockContext(), + keyFormat, + tableDataType, + keyProjection, + keyPrefix, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final Decoder valueDecoder = + Decoder.create( + new MockContext(), + valueFormat, + tableDataType, + valueProjection, + valuePrefix, + projectedFields, + Collections.emptyList(), + isPushdownEnabled); + + final int expectedArity = 3; + + final GenericRowData producedRow = new GenericRowData(expectedArity); + produceRow(keyDecoder, keyInput, producedRow); + produceRow(valueDecoder, valueInput, producedRow); + + final GenericRowData expected = new GenericRowData(expectedArity); + expected.setField(0, 1); + expected.setField(1, "v_a"); + expected.setField(2, "v_e"); + + assertThat(producedRow.toString()).isEqualTo(expected.toString()); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 4417346c9..44d0a3f1f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.FormatProjectionPushdownLevel; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; @@ -85,16 +86,19 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.regex.Pattern; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -394,11 +398,92 @@ public void testTableSourceWithKeyValueAndMetadata() { 0, null); expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedKafkaSource.valueFormatMetadataKeys = Collections.singletonList("metadata_2"); expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp"); assertThat(actualSource).isEqualTo(expectedKafkaSource); } + private static class NotIdempotentDecodingFormat + implements DecodingFormat> { + private final List metadataKeys = new ArrayList<>(); + + @Override + public Map listReadableMetadata() { + return Collections.singletonMap("a", DataTypes.STRING()); + } + + @Override + public void applyReadableMetadata(List metadataKeys) { + // this is deliberately not idempotent for testing purposes + this.metadataKeys.addAll(metadataKeys); + } + + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + throw new UnsupportedOperationException(); + } + + @Override + public ChangelogMode getChangelogMode() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NotIdempotentDecodingFormat that = (NotIdempotentDecodingFormat) o; + return Objects.equals(metadataKeys, that.metadataKeys); + } + + @Override + public int hashCode() { + return Objects.hashCode(metadataKeys); + } + } + + @Test + public void testApplyReadableMetadataIsIdempotent() { + final KafkaDynamicSource kafkaSource = + new KafkaDynamicSource( + SCHEMA_WITH_METADATA.toPhysicalRowDataType(), + null, + new NotIdempotentDecodingFormat(), + new int[] {}, + new int[] {}, + null, + TOPIC_LIST, + null, + new Properties(), + StartupMode.EARLIEST, + Collections.emptyMap(), + 0L, + BoundedMode.GROUP_OFFSETS, + Collections.emptyMap(), + 0L, + false, + "abc", + 1, + FormatProjectionPushdownLevel.NONE, + FormatProjectionPushdownLevel.NONE); + + final Supplier runnable = + () -> { + kafkaSource.applyReadableMetadata( + Arrays.asList("timestamp", "value.a"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + return kafkaSource.hashCode(); + }; + + assertThat(runnable.get()).isEqualTo(runnable.get()); + } + @Test public void testTableSourceCommitOnCheckpointDisabled() { final Map modifiedOptions = @@ -1356,7 +1441,9 @@ private static KafkaDynamicSource createExpectedScanSource( 0, false, FactoryMocks.IDENTIFIER.asSummaryString(), - parallelism); + parallelism, + FormatProjectionPushdownLevel.NONE, + FormatProjectionPushdownLevel.NONE); } private static KafkaDynamicSink createExpectedSink( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index e90c21916..39e6eaaaf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -27,14 +27,22 @@ import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.formats.json.JsonParseException; import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.connectors.kafka.config.FormatProjectionPushdownLevel; +import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.data.writer.BinaryWriter; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.test.util.SuccessException; import org.apache.flink.types.Row; @@ -47,6 +55,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.ThrowingConsumer; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -61,6 +70,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -78,6 +88,7 @@ import static org.apache.flink.util.CollectionUtil.entry; import static org.apache.flink.util.CollectionUtil.map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.HamcrestCondition.matching; @@ -180,7 +191,7 @@ void testKafkaSourceSink(final String format) throws Exception { "+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)", "+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)"); - assertThat(TestingSinkFunction.rows).isEqualTo(expected); + assertThat(TestingSinkFunction.getStringRows()).isEqualTo(expected); // ------------- cleanup ------------------- @@ -660,8 +671,9 @@ void testKafkaTableWithMultipleTopics(final String format) throws Exception { } } List expected = Arrays.asList("+I(Dollar)", "+I(Dummy)", "+I(Euro)", "+I(Yen)"); - TestingSinkFunction.rows.sort(Comparator.naturalOrder()); - assertThat(TestingSinkFunction.rows).isEqualTo(expected); + List rows = TestingSinkFunction.getStringRows(); + rows.sort(Comparator.naturalOrder()); + assertThat(rows).isEqualTo(expected); // ------------- cleanup ------------------- topics.forEach(super::deleteTestTopic); @@ -789,13 +801,17 @@ void testKafkaSourceSinkWithKeyAndPartialValue(final String format) throws Excep + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" - + " 'key.format' = '%s',\n" + + " %s,\n" + " 'key.fields' = 'k_event_id; k_user_id',\n" + " 'key.fields-prefix' = 'k_',\n" - + " 'value.format' = '%s',\n" + + " %s,\n" + " 'value.fields-include' = 'EXCEPT_KEY'\n" + ")", - topic, bootstraps, groupId, format, format); + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format)); tEnv.executeSql(createTable); @@ -872,12 +888,16 @@ void testKafkaSourceSinkWithKeyAndFullValue(final String format) throws Exceptio + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" - + " 'key.format' = '%s',\n" + + " %s,\n" + " 'key.fields' = 'event_id; user_id',\n" - + " 'value.format' = '%s',\n" + + " %s,\n" + " 'value.fields-include' = 'ALL'\n" + ")", - topic, bootstraps, groupId, format, format); + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format)); tEnv.executeSql(createTable); @@ -959,9 +979,9 @@ void testKafkaTemporalJoinChangelog(final String format) throws Exception { + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" - + " 'format' = '%s'\n" + + " %s\n" + ")", - orderTopic, bootstraps, groupId, format); + orderTopic, bootstraps, groupId, formatOptions(format)); tEnv.executeSql(orderTableDDL); String orderInitialValues = "INSERT INTO ordersTable\n" @@ -991,9 +1011,9 @@ void testKafkaTemporalJoinChangelog(final String format) throws Exception { + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" - + " 'value.format' = 'debezium-json'\n" + + " %s\n" + ")", - productTopic, bootstraps, groupId); + productTopic, bootstraps, groupId, valueFormatOptions("debezium-json")); tEnv.executeSql(productTableDDL); // use raw format to initial the changelog data @@ -1091,9 +1111,13 @@ void testPerPartitionWatermarkKafka(final String format) throws Exception { + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'sink.partitioner' = '%s',\n" - + " 'format' = '%s'\n" + + " %s\n" + ")", - topic, bootstraps, groupId, TestPartitioner.class.getName(), format); + topic, + bootstraps, + groupId, + TestPartitioner.class.getName(), + formatOptions(format)); tEnv.executeSql(createTable); @@ -1183,9 +1207,13 @@ void testPerPartitionWatermarkWithIdleSource(final String format) throws Excepti + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'sink.partitioner' = '%s',\n" - + " 'format' = '%s'\n" + + " %s\n" + ")", - topic, bootstraps, groupId, TestPartitioner.class.getName(), format); + topic, + bootstraps, + groupId, + TestPartitioner.class.getName(), + formatOptions(format)); tEnv.executeSql(createTable); @@ -1258,9 +1286,13 @@ void testLatestOffsetStrategyResume(final String format) throws Exception { + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'sink.partitioner' = '%s',\n" - + " 'format' = '%s'\n" + + " %s\n" + ")", - topic, bootstraps, groupId, TestPartitioner.class.getName(), format); + topic, + bootstraps, + groupId, + TestPartitioner.class.getName(), + formatOptions(format)); tEnv.executeSql(createTable); @@ -1439,7 +1471,7 @@ private TableResult startFromGroupOffset( + " 'properties.auto.offset.reset' = '%s',\n" + " 'properties.enable.auto.commit' = 'true',\n" + " 'properties.auto.commit.interval.ms' = '1000',\n" - + " 'format' = '%s'\n" + + " %s\n" + ")"; tEnv.executeSql( String.format( @@ -1449,7 +1481,7 @@ private TableResult startFromGroupOffset( bootstraps, groupId, resetStrategy, - format)); + formatOptions(format))); String initialValues = "INSERT INTO " @@ -1531,6 +1563,392 @@ private void testStartFromGroupOffsetsWithNoneResetStrategy(final String format) } } + private void projectionPushdownSetupData(final String format, final String topic) + throws Exception { + createTestTopic(topic, 1, 1); + + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `a` STRING,\n" + + " `b` STRING,\n" + + " `topic` STRING NOT NULL METADATA VIRTUAL,\n" + + " `c` STRING,\n" + + " `partition` INT NOT NULL METADATA VIRTUAL,\n" + + " `d` STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " %s,\n" + + " 'key.fields' = 'a; b',\n" + + " %s,\n" + + " 'value.fields-include' = 'EXCEPT_KEY'\n" + + ")", + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format)); + tEnv.executeSql(createTable); + + final String initialValues = "INSERT INTO kafka (a, b, c, d) SELECT 'a', 'b', 'c', 'd'"; + tEnv.executeSql(initialValues).await(); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void testProjectionPushdownSelectAllFields(final String format) throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + assertQueryResult( + "SELECT * FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[a, b, topic, c, partition, d])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, metadata=[topic, partition]]], fields=[a, b, c, d, topic, partition])\n", + Collections.singletonList(String.format("+I(a,b,%s,c,%d,d)", topic, 0))); + + cleanupTopic(topic); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void testProjectionPushdownSelectSpecificPhysicalFields(final String format) + throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + assertQueryResult( + "SELECT a, c FROM kafka", + "== Optimized Execution Plan ==\n" + + "TableSourceScan(table=[[default_catalog, default_database, kafka, project=[a, c], metadata=[]]], fields=[a, c])\n", + Collections.singletonList("+I(a,c)")); + + cleanupTopic(topic); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void testProjectionPushdownSelectSpecificPhysicalFieldsInDifferentOrderFromTableSchema( + final String format) throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + assertQueryResult( + "SELECT c, a FROM kafka", + "== Optimized Execution Plan ==\n" + + "TableSourceScan(table=[[default_catalog, default_database, kafka, project=[c, a], metadata=[]]], fields=[c, a])\n", + Collections.singletonList("+I(c,a)")); + + cleanupTopic(topic); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void + testProjectionPushdownSelectSpecificPhysicalAndMetadataFieldsInDifferentOrderFromTableSchema( + final String format) throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + assertQueryResult( + "SELECT c, `partition`, a, topic FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[c, partition, a, topic])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, project=[c, a], metadata=[topic, partition]]], fields=[c, a, topic, partition])\n", + Collections.singletonList(String.format("+I(c,%s,a,%s)", 0, topic))); + + cleanupTopic(topic); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void testProjectionPushdownSelectSameColumnTwice(final String format) throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + assertQueryResult( + "SELECT b AS b1, b AS b2 FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[b AS b1, b AS b2])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, project=[b], metadata=[]]], fields=[b])\n", + Collections.singletonList("+I(b,b)")); + + cleanupTopic(topic); + } + + @ParameterizedTest(name = "format: {0}") + @MethodSource("formats") + public void testProjectionPushdownNestColumns(final String format) throws Exception { + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + projectionPushdownSetupData(format, topic); + + final String query = "SELECT ROW(b, a) AS nested FROM kafka"; + + final String expectedPlanEndsWith = + "== Optimized Execution Plan ==\n" + + "Calc(select=[ROW(b, a) AS nested])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, project=[b, a], metadata=[]]], fields=[b, a])\n"; + + final BinaryRowData nested = new BinaryRowData(2); + final BinaryWriter binaryWriter = new BinaryRowWriter(nested); + binaryWriter.writeString(0, StringData.fromString("b")); + binaryWriter.writeString(1, StringData.fromString("a")); + binaryWriter.complete(); + final GenericRowData row = new GenericRowData(1); + row.setField(0, nested); + final List expected = Collections.singletonList(row); + + final Table table = tEnv.sqlQuery(query); + + assertThat(table.explain()).endsWith(expectedPlanEndsWith); + + final DataStream result = + tEnv.toRetractStream(table, RowData.class).map(tuple -> tuple.f1); + final TestingSinkFunction sink = new TestingSinkFunction(expected.size()); + result.addSink(sink).setParallelism(1); + try { + env.execute("Job_2"); + } catch (Throwable e) { + if (!isCausedByJobFinished(e)) { + throw e; + } + } + assertThat(TestingSinkFunction.getRows()).isEqualTo(expected); + + cleanupTopic(topic); + } + + private void nestedProjectionPushdownSetupData(final String topic) throws Exception { + // Only JSON format supports nested projection pushdown currently + final String format = "json"; + + createTestTopic(topic, 1, 1); + + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `a` ROW(a1 STRING, a2 STRING),\n" + + " `b` ROW(b1 STRING, b2 STRING),\n" + + " `topic` STRING NOT NULL METADATA VIRTUAL,\n" + + " `c` ROW(c1 STRING, c2 STRING),\n" + + " `partition` INT NOT NULL METADATA VIRTUAL,\n" + + " `d` ROW(d1 ROW(d2 STRING, d3 STRING))\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " %s,\n" + + " 'key.fields' = 'a; b',\n" + + " %s,\n" + + " 'value.fields-include' = 'EXCEPT_KEY'\n" + + ")", + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format)); + tEnv.executeSql(createTable); + + tEnv.executeSql( + "INSERT INTO kafka \n" + + "SELECT \n" + + " ROW('a1', 'a2'), \n" + + " ROW('b1', 'b2'), \n" + + " ROW('c1', 'c2'), \n" + + " ROW(ROW('d2', 'd3'))") + .await(); + } + + @Test + public void testNestedProjectionPushdownSelectAllFields() throws Exception { + final String topic = "testNestedProjectionPushdown_" + "_" + UUID.randomUUID(); + nestedProjectionPushdownSetupData(topic); + + assertQueryResult( + "SELECT * FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[a, b, topic, c, partition, d])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, metadata=[topic, partition]]], fields=[a, b, c, d, topic, partition])\n", + Collections.singletonList( + String.format( + "+I(+I(a1,a2),+I(b1,b2),%s,+I(c1,c2),0,+I(+I(d2,d3)))", topic))); + + cleanupTopic(topic); + } + + @Test + public void testNestedProjectionPushdownSelectSpecificNestedPhysicalFields() throws Exception { + final String topic = "testNestedProjectionPushdown_" + "_" + UUID.randomUUID(); + nestedProjectionPushdownSetupData(topic); + + assertQueryResult( + "SELECT a.a1, d.d1.d3 FROM kafka", + "== Optimized Execution Plan ==\n" + + "TableSourceScan(table=[[default_catalog, default_database, kafka, project=[a_a1, d_d1_d3], metadata=[]]], fields=[a_a1, d_d1_d3])\n", + Collections.singletonList("+I(a1,d3)")); + + cleanupTopic(topic); + } + + @Test + public void + testNestedProjectionPushdownSelectSpecificNestedPhysicalFieldsInDifferentOrderFromTableSchema() + throws Exception { + final String topic = "testNestedProjectionPushdown_" + "_" + UUID.randomUUID(); + nestedProjectionPushdownSetupData(topic); + + assertQueryResult( + "SELECT d.d1.d3, a.a1 FROM kafka", + "== Optimized Execution Plan ==\n" + + "TableSourceScan(table=[[default_catalog, default_database, kafka, project=[d_d1_d3, a_a1], metadata=[]]], fields=[d_d1_d3, a_a1])\n", + Collections.singletonList("+I(d3,a1)")); + + cleanupTopic(topic); + } + + @Test + public void + testNestedProjectionPushdownSelectSpecificNestedPhysicalAndMetadataFieldsInDifferentOrderFromTableSchema() + throws Exception { + final String topic = "testNestedProjectionPushdown_" + "_" + UUID.randomUUID(); + nestedProjectionPushdownSetupData(topic); + + assertQueryResult( + "SELECT d.d1.d3, `partition`, a.a1, topic FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[d_d1_d3 AS d3, partition, a_a1 AS a1, topic])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, project=[d_d1_d3, a_a1], metadata=[topic, partition]]], fields=[d_d1_d3, a_a1, topic, partition])\n", + Collections.singletonList(String.format("+I(d3,%s,a1,%s)", 0, topic))); + + cleanupTopic(topic); + } + + @Test + public void + testNestedProjectionPushdownSelectSpecificNestedAndNonNestedPhysicalAndMetadataFieldsInDifferentOrderFromTableSchema() + throws Exception { + final String topic = "testNestedProjectionPushdown_" + "_" + UUID.randomUUID(); + nestedProjectionPushdownSetupData(topic); + + assertQueryResult( + "SELECT d.d1.d3, d, `partition`, a.a1, topic FROM kafka", + "== Optimized Execution Plan ==\n" + + "Calc(select=[d.d1.d3 AS d3, d, partition, a_a1 AS a1, topic])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, kafka, project=[d, a_a1], metadata=[topic, partition]]], fields=[d, a_a1, topic, partition])\n", + Collections.singletonList( + String.format("+I(d3,+I(+I(d2,d3)),%s,a1,%s)", 0, topic))); + + cleanupTopic(topic); + } + + @Test + public void testProjectionPushdownWithJsonFormatAndBreakingSchemaChange() throws Exception { + // Only self-contained formats like JSON can use projection pushdown to + // avoid deserializing fields that aren't needed for a query and thus avoid + // errors from breaking schema changes. + final String format = "json"; + + // Setup data + + final String topic = "testProjectionPushdown_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String originalCreateTable = + String.format( + "CREATE TABLE kafka (\n" + + " `a` INT,\n" // `a` is originally an INT field + + " `b` STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " %s,\n" + + " 'key.fields' = 'a; b',\n" + + " %s,\n" + + " 'value.fields-include' = 'EXCEPT_KEY'\n" + + ")", + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format)); + tEnv.executeSql(originalCreateTable); + final int a1 = 1; + final String b1 = "b1"; + tEnv.executeSql( + String.format( + "INSERT INTO kafka SELECT * FROM (VALUES (%d, '%s'))", a1, b1)) + .await(); + tEnv.executeSql("DROP TABLE kafka").await(); + + tEnv.executeSql( + String.format( + "CREATE TABLE kafka (\n" + + " `a` STRING,\n" // `a` is now a STRING field + + " `b` STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " %s,\n" + + " 'key.fields' = 'a; b',\n" + + " %s,\n" + + " 'value.fields-include' = 'EXCEPT_KEY'\n" + + ")", + topic, + bootstraps, + groupId, + keyFormatOptions(format), + valueFormatOptions(format))); + final String a2 = "a2"; + final String b2 = "b2"; + tEnv.executeSql( + String.format( + "INSERT INTO kafka SELECT * FROM (VALUES ('%s', '%s'))", a2, b2)) + .await(); + tEnv.executeSql("DROP TABLE kafka").await(); + + // Read data using original create table statement where `a` is still typed as an INT + tEnv.executeSql(originalCreateTable); + + // If you try to read all the fields, you naturally you get a deserialization exception + // because of the breaking schema change to field `a` + assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM kafka").await()) + .hasRootCauseInstanceOf(JsonParseException.class) + .hasRootCauseMessage("Fail to deserialize at field: a."); + + // If you try to read just the fields that didn't have any breaking schema changes + // the query will work because projection pushdown ensures that fields that aren't needed + // aren't deserialized + assertQueryResult( + "SELECT b FROM kafka", + "== Optimized Execution Plan ==\n" + + "TableSourceScan(table=[[default_catalog, default_database, kafka, project=[b], metadata=[]]], fields=[b])\n", + Arrays.asList(String.format("+I(%s)", b1), String.format("+I(%s)", b2))); + + // clean up + cleanupTopic(topic); + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -1548,14 +1966,62 @@ public int partition( } } + private static FormatProjectionPushdownLevel projectionPushdownLevel(final String format) { + if (Objects.equals(format, "avro")) { + return FormatProjectionPushdownLevel.NONE; + } else if (Objects.equals(format, "csv")) { + return FormatProjectionPushdownLevel.TOP_LEVEL; + } else if (Objects.equals(format, "json")) { + return FormatProjectionPushdownLevel.ALL; + } else if (Objects.equals(format, "debezium-json")) { + return FormatProjectionPushdownLevel.TOP_LEVEL; + } else { + throw new UnsupportedOperationException( + String.format( + "The level of support the %s format has for projection pushdown is unknown", + format)); + } + } + private String formatOptions(final String format) { - return String.format("'format' = '%s'", format); + final FormatProjectionPushdownLevel formatProjectionPushdownLevel = + projectionPushdownLevel(format); + + return String.format( + "'format' = '%s',\n" + "'%s' = '%s',\n" + "'%s' = '%s'", + format, + KafkaConnectorOptions.KEY_PROJECTION_PUSHDOWN_LEVEL.key(), + formatProjectionPushdownLevel, + KafkaConnectorOptions.VALUE_PROJECTION_PUSHDOWN_LEVEL.key(), + formatProjectionPushdownLevel); + } + + private static String keyFormatOptions(final String format) { + final FormatProjectionPushdownLevel formatProjectionPushdownLevel = + projectionPushdownLevel(format); + + return String.format( + "'key.format' = '%s',\n" + "'%s' = '%s'", + format, + KafkaConnectorOptions.KEY_PROJECTION_PUSHDOWN_LEVEL.key(), + formatProjectionPushdownLevel); + } + + private static String valueFormatOptions(final String format) { + final FormatProjectionPushdownLevel formatProjectionPushdownLevel = + projectionPushdownLevel(format); + + return String.format( + "'value.format' = '%s',\n" + "'%s' = '%s'", + format, + KafkaConnectorOptions.VALUE_PROJECTION_PUSHDOWN_LEVEL.key(), + formatProjectionPushdownLevel); } private static final class TestingSinkFunction implements SinkFunction { private static final long serialVersionUID = 455430015321124493L; - private static List rows = new ArrayList<>(); + private static final List rows = new ArrayList<>(); private final int expectedSize; @@ -1566,12 +2032,20 @@ private TestingSinkFunction(int expectedSize) { @Override public void invoke(RowData value, Context context) { - rows.add(value.toString()); + rows.add(value); if (rows.size() >= expectedSize) { // job finish throw new SuccessException(); } } + + private static List getRows() { + return rows; + } + + private static List getStringRows() { + return getRows().stream().map(RowData::toString).collect(Collectors.toList()); + } } private static boolean isCausedByJobFinished(Throwable e) { @@ -1612,4 +2086,25 @@ private static void ignoreExceptions( assertThat(ex).satisfiesAnyOf(ignoreIf); } } + + private void assertQueryResult( + final String query, final String expectedPlanEndsWith, final List expected) + throws Exception { + final Table table = tEnv.sqlQuery(query); + + assertThat(table.explain()).endsWith(expectedPlanEndsWith); + + final DataStream result = + tEnv.toRetractStream(table, RowData.class).map(tuple -> tuple.f1); + final TestingSinkFunction sink = new TestingSinkFunction(expected.size()); + result.addSink(sink).setParallelism(1); + try { + env.execute("Job_2"); + } catch (Throwable e) { + if (!isCausedByJobFinished(e)) { + throw e; + } + } + assertThat(TestingSinkFunction.getStringRows()).isEqualTo(expected); + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 5bc42f3ba..2c251449e 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.FormatProjectionPushdownLevel; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; @@ -957,7 +958,9 @@ private KafkaDynamicSource createExpectedScanSource( 0, true, FactoryMocks.IDENTIFIER.asSummaryString(), - parallelism); + parallelism, + FormatProjectionPushdownLevel.NONE, + FormatProjectionPushdownLevel.NONE); } private static KafkaDynamicSink createExpectedSink(