Skip to content

Commit 1192c9b

Browse files
committed
[FLINK-32609] Support Projection Pushdown
1 parent 5384ab2 commit 1192c9b

17 files changed

+1454
-157
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
4848
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
4949
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
5050
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
51-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
52-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
51+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:521)
52+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:565)
5353
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5454
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)
55+
Method <org.apache.flink.streaming.connectors.kafka.table.ProjectionHandler.toDataType(org.apache.flink.table.types.DataType, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (ProjectionHandler.java:148)

flink-connector-kafka/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ under the License.
4444
</flink.connector.module.config>
4545
</properties>
4646

47+
<repositories>
48+
<repository>
49+
<id>confluent</id>
50+
<url>https://packages.confluent.io/maven/</url>
51+
</repository>
52+
</repositories>
53+
4754
<dependencies>
4855

4956
<!-- Core -->
@@ -326,6 +333,12 @@ under the License.
326333
<artifactId>flink-csv</artifactId>
327334
<scope>test</scope>
328335
</dependency>
336+
<dependency>
337+
<groupId>io.confluent</groupId>
338+
<artifactId>kafka-avro-serializer</artifactId>
339+
<version>7.9.0</version>
340+
<scope>test</scope>
341+
</dependency>
329342

330343
<!-- ArchUnit test dependencies -->
331344
<dependency>

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.Serializable;
3838
import java.util.ArrayList;
3939
import java.util.List;
40+
import java.util.Map;
4041

4142
/** A specific {@link KafkaRecordDeserializationSchema} for {@link KafkaDynamicSource}. */
4243
@Internal
@@ -58,19 +59,21 @@ class DynamicKafkaDeserializationSchema implements KafkaRecordDeserializationSch
5859

5960
private final boolean upsertMode;
6061

62+
private final Map<Integer, Integer> valueProjection;
63+
6164
DynamicKafkaDeserializationSchema(
6265
int physicalArity,
6366
@Nullable DeserializationSchema<RowData> keyDeserialization,
64-
int[] keyProjection,
67+
Map<Integer, Integer> keyProjection,
6568
DeserializationSchema<RowData> valueDeserialization,
66-
int[] valueProjection,
69+
Map<Integer, Integer> valueProjection,
6770
boolean hasMetadata,
6871
MetadataConverter[] metadataConverters,
6972
TypeInformation<RowData> producedTypeInfo,
7073
boolean upsertMode) {
7174
if (upsertMode) {
7275
Preconditions.checkArgument(
73-
keyDeserialization != null && keyProjection.length > 0,
76+
keyDeserialization != null && !keyProjection.isEmpty(),
7477
"Key must be set in upsert mode for deserialization schema.");
7578
}
7679
this.keyDeserialization = keyDeserialization;
@@ -86,6 +89,7 @@ class DynamicKafkaDeserializationSchema implements KafkaRecordDeserializationSch
8689
upsertMode);
8790
this.producedTypeInfo = producedTypeInfo;
8891
this.upsertMode = upsertMode;
92+
this.valueProjection = valueProjection;
8993
}
9094

9195
@Override
@@ -101,7 +105,7 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
101105
throws IOException {
102106
// shortcut in case no output projection is required,
103107
// also not for a cartesian product with the keys
104-
if (keyDeserialization == null && !hasMetadata) {
108+
if (keyDeserialization == null && !hasMetadata && valueProjection.isEmpty()) {
105109
valueDeserialization.deserialize(record.value(), collector);
106110
return;
107111
}
@@ -176,9 +180,9 @@ private static final class OutputProjectionCollector
176180

177181
private final int physicalArity;
178182

179-
private final int[] keyProjection;
183+
private final Map<Integer, Integer> keyProjection;
180184

181-
private final int[] valueProjection;
185+
private final Map<Integer, Integer> valueProjection;
182186

183187
private final MetadataConverter[] metadataConverters;
184188

@@ -192,8 +196,8 @@ private static final class OutputProjectionCollector
192196

193197
OutputProjectionCollector(
194198
int physicalArity,
195-
int[] keyProjection,
196-
int[] valueProjection,
199+
Map<Integer, Integer> keyProjection,
200+
Map<Integer, Integer> valueProjection,
197201
MetadataConverter[] metadataConverters,
198202
boolean upsertMode) {
199203
this.physicalArity = physicalArity;
@@ -206,7 +210,7 @@ private static final class OutputProjectionCollector
206210
@Override
207211
public void collect(RowData physicalValueRow) {
208212
// no key defined
209-
if (keyProjection.length == 0) {
213+
if (keyProjection.isEmpty()) {
210214
emitRow(null, (GenericRowData) physicalValueRow);
211215
return;
212216
}
@@ -241,16 +245,17 @@ private void emitRow(
241245
final GenericRowData producedRow =
242246
new GenericRowData(rowKind, physicalArity + metadataArity);
243247

244-
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
245-
assert physicalKeyRow != null;
246-
producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
247-
}
248+
keyProjection.forEach(
249+
(keyPos, targetPos) -> {
250+
assert physicalKeyRow != null;
251+
producedRow.setField(targetPos, physicalKeyRow.getField(keyPos));
252+
});
248253

249254
if (physicalValueRow != null) {
250-
for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
251-
producedRow.setField(
252-
valueProjection[valuePos], physicalValueRow.getField(valuePos));
253-
}
255+
valueProjection.forEach(
256+
(valuePos, targetPos) ->
257+
producedRow.setField(
258+
targetPos, physicalValueRow.getField(valuePos)));
254259
}
255260

256261
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.HashMap;
5151
import java.util.List;
5252
import java.util.Map;
53+
import java.util.Objects;
5354
import java.util.Optional;
5455
import java.util.Properties;
5556
import java.util.regex.Pattern;
@@ -587,6 +588,19 @@ public static int[] createValueFormatProjection(
587588
throw new TableException("Unknown value fields strategy:" + strategy);
588589
}
589590

591+
public static boolean disableKeyProjectionPushdownIntoDecoder(final ReadableConfig options) {
592+
return options.getOptional(KEY_FORMAT)
593+
.map(format -> Objects.equals(format, "avro"))
594+
.orElse(false);
595+
}
596+
597+
public static boolean disableValueProjectionPushdownIntoDecoder(final ReadableConfig options) {
598+
return options.getOptional(FactoryUtil.FORMAT)
599+
.or(() -> options.getOptional(VALUE_FORMAT))
600+
.map(format -> Objects.equals(format, "avro"))
601+
.orElse(false);
602+
}
603+
590604
/**
591605
* Returns a new table context with a default schema registry subject value in the options if
592606
* the format is a schema registry format (e.g. 'avro-confluent') and the subject is not

0 commit comments

Comments
 (0)