Skip to content

Commit e22ed76

Browse files
committed
[FLINK-32609] Support Projection Pushdown
1 parent 5384ab2 commit e22ed76

File tree

12 files changed

+1575
-102
lines changed

12 files changed

+1575
-102
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
4747
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.setConsumerClientRack(java.util.Properties, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)
4848
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
4949
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
50+
Method <org.apache.flink.streaming.connectors.kafka.table.Decoder.toDataType(org.apache.flink.table.types.DataType, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (Decoder.java:155)
5051
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
5152
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
5253
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
5354
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5455
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.config;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/**
24+
* Projection pushdown mode for {@link
25+
* org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}.
26+
*/
27+
@Internal
28+
public enum ProjectionPushdownLevel {
29+
30+
/** Turns off projection pushdown entirely. */
31+
NONE,
32+
33+
/** Enables projection pushdown for top-level fields only. */
34+
TOP_LEVEL,
35+
36+
/** Enables projection pushdown for top-level and nested fields. */
37+
NESTED
38+
}
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.table.connector.Projection;
24+
import org.apache.flink.table.connector.format.DecodingFormat;
25+
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
26+
import org.apache.flink.table.connector.source.DynamicTableSource.Context;
27+
import org.apache.flink.table.data.GenericRowData;
28+
import org.apache.flink.table.data.RowData;
29+
import org.apache.flink.table.types.DataType;
30+
import org.apache.flink.table.types.utils.DataTypeUtils;
31+
32+
import javax.annotation.Nullable;
33+
34+
import java.io.Serializable;
35+
import java.util.ArrayList;
36+
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Objects;
41+
import java.util.Optional;
42+
import java.util.stream.Collectors;
43+
44+
/**
45+
* Decoding messages consists of two potential steps:
46+
*
47+
* <ol>
48+
* <li>Deserialization i.e deserializing the {@code byte[]} into a {@link RowData}. This process
49+
* is handled by a {@link DeserializationSchema}.
50+
* <li>Projection i.e. projecting any required fields from the deserialized {@link RowData}
51+
* (returned by the {@link DeserializationSchema} in the first step) to their positions in the
52+
* final produced {@link RowData}. This process is handled by a {@link Projector}.
53+
* </ol>
54+
*
55+
* <p>In order to decode messages correctly, the {@link DeserializationSchema} and the {@link
56+
* Projector} need to work together. For example, the {@link Projector} needs to know the positions
57+
* of the required fields in the {@link RowData} returned by the {@link DeserializationSchema} in
58+
* order to be able to correctly set fields in the final produced {@link RowData}.
59+
*
60+
* <p>That's why we have this {@link Decoder} class. This class ensures that the returned {@link
61+
* DeserializationSchema} and {@link Projector} will work together to decode messages correctly.
62+
*/
63+
@Internal
64+
public class Decoder {
65+
66+
/**
67+
* Can be null. Null is used inside {@link DynamicKafkaDeserializationSchema} to avoid
68+
* deserializing keys if not required.
69+
*/
70+
private final @Nullable DeserializationSchema<RowData> deserializationSchema;
71+
72+
/** Mapping of the physical position in the key to the target position in the RowData. */
73+
private final Projector projector;
74+
75+
private Decoder(
76+
final DeserializationSchema<RowData> deserializationSchema, final Projector projector) {
77+
this.deserializationSchema = deserializationSchema;
78+
this.projector = projector;
79+
}
80+
81+
/**
82+
* @param decodingFormat Optional format for decoding bytes.
83+
* @param tableDataType The data type representing the table schema.
84+
* @param dataTypeProjection Indices indicate the position of the field in the dataType
85+
* (key/value). Values indicate the position of the field in the tableSchema.
86+
* @param prefix Optional field prefix
87+
* @param projectedFields Indices indicate the position of the field in the produced Row. Values
88+
* indicate the position of the field in the table schema.
89+
* @param projectionPushdownIsEnabled if true and the format is a {@link
90+
* ProjectableDecodingFormat}, any {@param projectedFields} will be pushed down into the
91+
* deserializer.
92+
* @return a {@link Decoder} instance.
93+
*/
94+
public static Decoder create(
95+
final Context context,
96+
final @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
97+
final DataType tableDataType,
98+
final int[] dataTypeProjection,
99+
final @Nullable String prefix,
100+
final int[][] projectedFields,
101+
final int metadataSize,
102+
final boolean projectionPushdownIsEnabled) {
103+
if (decodingFormat == null) {
104+
return Decoder.noDeserializationOrProjection();
105+
} else {
106+
if (decodingFormat instanceof ProjectableDecodingFormat
107+
&& projectionPushdownIsEnabled) {
108+
return Decoder.projectInsideDeserializer(
109+
context,
110+
(ProjectableDecodingFormat<DeserializationSchema<RowData>>) decodingFormat,
111+
tableDataType,
112+
dataTypeProjection,
113+
prefix,
114+
projectedFields,
115+
metadataSize);
116+
} else {
117+
return Decoder.projectAfterDeserializing(
118+
context,
119+
decodingFormat,
120+
tableDataType,
121+
dataTypeProjection,
122+
prefix,
123+
projectedFields,
124+
metadataSize);
125+
}
126+
}
127+
}
128+
129+
/** @return a {@link DeserializationSchema} or null. */
130+
@Nullable
131+
public DeserializationSchema<RowData> getDeserializationSchema() {
132+
return deserializationSchema;
133+
}
134+
135+
/** @return a {@link Projector}. */
136+
public Projector getProjector() {
137+
return projector;
138+
}
139+
140+
private static Decoder noDeserializationOrProjection() {
141+
return new Decoder(null, new ProjectorImpl(Map.of()));
142+
}
143+
144+
private static DataType toDataType(
145+
final DataType tableDataType,
146+
final int[] dataTypeProjection,
147+
final @Nullable String prefix) {
148+
final DataType temp = Projection.of(dataTypeProjection).project(tableDataType);
149+
return Optional.ofNullable(prefix)
150+
.map(s -> DataTypeUtils.stripRowPrefix(temp, s))
151+
.orElse(temp);
152+
}
153+
154+
private static Map<Integer, Integer> tableToDeserializedTopLevelPos(
155+
final int[] dataTypeProjection) {
156+
final HashMap<Integer, Integer> tableToDeserializedPos = new HashMap<>();
157+
for (int i = 0; i < dataTypeProjection.length; i++) {
158+
tableToDeserializedPos.put(dataTypeProjection[i], i);
159+
}
160+
return tableToDeserializedPos;
161+
}
162+
163+
private static int[] copyArray(final int[] arr) {
164+
return Arrays.copyOf(arr, arr.length);
165+
}
166+
167+
private static void addMetadataProjections(
168+
final int deserializedSize,
169+
final int physicalSize,
170+
final int metadataSize,
171+
final Map<List<Integer>, Integer> deserializedToProducedPos) {
172+
for (int i = 0; i < metadataSize; i++) {
173+
// metadata is always added to the end of the deserialized row by the DecodingFormat
174+
final int deserializedPos = deserializedSize + i;
175+
// we need to always add metadata to the end of the produced row
176+
final int producePos = physicalSize + i;
177+
deserializedToProducedPos.put(List.of(deserializedPos), producePos);
178+
}
179+
}
180+
181+
private static Decoder projectInsideDeserializer(
182+
final Context context,
183+
final ProjectableDecodingFormat<DeserializationSchema<RowData>>
184+
projectableDecodingFormat,
185+
final DataType tableDataType,
186+
final int[] dataTypeProjection,
187+
final @Nullable String prefix,
188+
final int[][] projectedFields,
189+
final int metadataSize) {
190+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
191+
tableToDeserializedTopLevelPos(dataTypeProjection);
192+
193+
final List<int[]> deserializerProjectedFields = new ArrayList<>();
194+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
195+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
196+
final int[] tablePos = projectedFields[producedPos];
197+
final int tableTopLevelPos = tablePos[0];
198+
199+
final Integer dataTypeTopLevelPos =
200+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
201+
if (dataTypeTopLevelPos != null) {
202+
final int[] dataTypePos = copyArray(tablePos);
203+
dataTypePos[0] = dataTypeTopLevelPos;
204+
205+
deserializerProjectedFields.add(dataTypePos);
206+
207+
final int deserializedPos = deserializerProjectedFields.size() - 1;
208+
deserializedToProducedPos.put(List.of(deserializedPos), producedPos);
209+
}
210+
}
211+
212+
addMetadataProjections(
213+
deserializerProjectedFields.size(),
214+
projectedFields.length,
215+
metadataSize,
216+
deserializedToProducedPos);
217+
218+
return new Decoder(
219+
projectableDecodingFormat.createRuntimeDecoder(
220+
context,
221+
toDataType(tableDataType, dataTypeProjection, prefix),
222+
deserializerProjectedFields.toArray(
223+
new int[deserializerProjectedFields.size()][])),
224+
new ProjectorImpl(deserializedToProducedPos));
225+
}
226+
227+
private static Decoder projectAfterDeserializing(
228+
final Context context,
229+
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
230+
final DataType tableDataType,
231+
final int[] dataTypeProjection,
232+
final @Nullable String prefix,
233+
final int[][] projectedFields,
234+
final int metadataSize) {
235+
final DataType dataType = toDataType(tableDataType, dataTypeProjection, prefix);
236+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
237+
tableToDeserializedTopLevelPos(dataTypeProjection);
238+
239+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
240+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
241+
final int[] tablePos = projectedFields[producedPos];
242+
int tableTopLevelPos = tablePos[0];
243+
244+
final Integer deserializedTopLevelPos =
245+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
246+
if (deserializedTopLevelPos != null) {
247+
final int[] deserializedPos = copyArray(tablePos);
248+
deserializedPos[0] = deserializedTopLevelPos;
249+
250+
deserializedToProducedPos.put(
251+
Arrays.stream(deserializedPos)
252+
.boxed()
253+
.collect(Collectors.toUnmodifiableList()),
254+
producedPos);
255+
}
256+
}
257+
258+
addMetadataProjections(
259+
dataTypeProjection.length,
260+
projectedFields.length,
261+
metadataSize,
262+
deserializedToProducedPos);
263+
264+
return new Decoder(
265+
decodingFormat.createRuntimeDecoder(context, dataType),
266+
new ProjectorImpl(deserializedToProducedPos));
267+
}
268+
269+
/** Projects fields from the deserialized row to their positions in the final produced row. */
270+
@Internal
271+
public interface Projector extends Serializable {
272+
/** Returns true if {@link #project} will not project any fields. */
273+
boolean isEmptyProjection();
274+
275+
/**
276+
* Returns true if deserialized positions are different from the final produced row
277+
* positions.
278+
*/
279+
boolean isProjectionNeeded();
280+
281+
/** Copies fields from the deserialized row to their final positions in the produced row. */
282+
void project(final RowData deserialized, final GenericRowData producedRow);
283+
}
284+
285+
private static class ProjectorImpl implements Projector {
286+
287+
private final Map<List<Integer>, Integer> deserializedToProducedPos;
288+
private final boolean isProjectionNeeded;
289+
290+
ProjectorImpl(final Map<List<Integer>, Integer> deserializedToProducedPos) {
291+
this.deserializedToProducedPos = deserializedToProducedPos;
292+
this.isProjectionNeeded =
293+
!deserializedToProducedPos.entrySet().stream()
294+
.allMatch(
295+
entry -> {
296+
final List<Integer> deserializedPos = entry.getKey();
297+
final List<Integer> producedPos = List.of(entry.getValue());
298+
return Objects.equals(producedPos, deserializedPos);
299+
});
300+
}
301+
302+
@Override
303+
public boolean isEmptyProjection() {
304+
return deserializedToProducedPos.isEmpty();
305+
}
306+
307+
@Override
308+
public boolean isProjectionNeeded() {
309+
return isProjectionNeeded;
310+
}
311+
312+
@Override
313+
public void project(final RowData deserialized, final GenericRowData producedRow) {
314+
this.deserializedToProducedPos.forEach(
315+
(deserializedPos, targetPos) -> {
316+
Object value = deserialized;
317+
for (final Integer i : deserializedPos) {
318+
value = ((GenericRowData) value).getField(i);
319+
}
320+
producedRow.setField(targetPos, value);
321+
});
322+
}
323+
}
324+
}

0 commit comments

Comments
 (0)