Skip to content

Commit 3dc9030

Browse files
fqshopifyfqaiser94
authored andcommitted
Support Projection Pushdown
1 parent 681e686 commit 3dc9030

File tree

12 files changed

+1833
-106
lines changed

12 files changed

+1833
-106
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
4747
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.setConsumerClientRack(java.util.Properties, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)
4848
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
4949
Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffsetsToCommit()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceReader.java:0)
50+
Method <org.apache.flink.streaming.connectors.kafka.table.Decoder.toDataType(org.apache.flink.table.types.DataType, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (Decoder.java:155)
5051
Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
5152
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520)
5253
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564)
5354
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408)
5455
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
55-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.config;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/**
24+
* Projection pushdown mode for {@link
25+
* org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}.
26+
*/
27+
@Internal
28+
public enum FormatProjectionPushdownLevel {
29+
30+
/** The format does not support any kind of projection pushdown. */
31+
NONE,
32+
33+
/** The format supports projection pushdown for top-level fields only. */
34+
TOP_LEVEL,
35+
36+
/** The format supports projection pushdown for top-level and nested fields. */
37+
ALL
38+
}
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.table.connector.Projection;
24+
import org.apache.flink.table.connector.format.DecodingFormat;
25+
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
26+
import org.apache.flink.table.connector.source.DynamicTableSource.Context;
27+
import org.apache.flink.table.data.GenericRowData;
28+
import org.apache.flink.table.data.RowData;
29+
import org.apache.flink.table.types.DataType;
30+
import org.apache.flink.table.types.utils.DataTypeUtils;
31+
32+
import javax.annotation.Nullable;
33+
34+
import java.io.Serializable;
35+
import java.util.ArrayList;
36+
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Objects;
41+
import java.util.Optional;
42+
import java.util.stream.Collectors;
43+
44+
/**
45+
* Decoding messages consists of two potential steps:
46+
*
47+
* <ol>
48+
* <li>Deserialization i.e deserializing the {@code byte[]} into a {@link RowData}. This process
49+
* is handled by a {@link DeserializationSchema}.
50+
* <li>Projection i.e. projecting any required fields from the deserialized {@link RowData}
51+
* (returned by the {@link DeserializationSchema} in the first step) to their positions in the
52+
* final produced {@link RowData}. This process is handled by a {@link Projector}.
53+
* </ol>
54+
*
55+
* <p>In order to decode messages correctly, the {@link DeserializationSchema} and the {@link
56+
* Projector} need to work together. For example, the {@link Projector} needs to know the positions
57+
* of the required fields in the {@link RowData} returned by the {@link DeserializationSchema} in
58+
* order to be able to correctly set fields in the final produced {@link RowData}.
59+
*
60+
* <p>That's why we have this {@link Decoder} class. This class ensures that the returned {@link
61+
* DeserializationSchema} and {@link Projector} will work together to decode messages correctly.
62+
*/
63+
@Internal
64+
public class Decoder {
65+
66+
/**
67+
* Can be null. Null is used inside {@link DynamicKafkaDeserializationSchema} to avoid
68+
* deserializing keys if not required.
69+
*/
70+
private final @Nullable DeserializationSchema<RowData> deserializationSchema;
71+
72+
/** Mapping of the physical position in the key to the target position in the RowData. */
73+
private final Projector projector;
74+
75+
private Decoder(
76+
final DeserializationSchema<RowData> deserializationSchema, final Projector projector) {
77+
this.deserializationSchema = deserializationSchema;
78+
this.projector = projector;
79+
}
80+
81+
/**
82+
* @param decodingFormat Optional format for decoding bytes.
83+
* @param tableDataType The data type representing the table schema.
84+
* @param dataTypeProjection Indices indicate the position of the field in the dataType
85+
* (key/value). Values indicate the position of the field in the tableSchema.
86+
* @param prefix Optional field prefix
87+
* @param projectedFields Indices indicate the position of the field in the produced Row. Values
88+
* indicate the position of the field in the table schema.
89+
* @param pushProjectionsIntoDecodingFormat if this is true and the format is a {@link
90+
* ProjectableDecodingFormat}, any {@param projectedFields} will be pushed down into the
91+
* {@link ProjectableDecodingFormat}. Otherwise, projections will be applied after
92+
* deserialization.
93+
* @return a {@link Decoder} instance.
94+
*/
95+
public static Decoder create(
96+
final Context context,
97+
final @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
98+
final DataType tableDataType,
99+
final int[] dataTypeProjection,
100+
final @Nullable String prefix,
101+
final int[][] projectedFields,
102+
final int metadataSize,
103+
final boolean pushProjectionsIntoDecodingFormat) {
104+
if (decodingFormat == null) {
105+
return Decoder.noDeserializationOrProjection();
106+
} else {
107+
if (decodingFormat instanceof ProjectableDecodingFormat
108+
&& pushProjectionsIntoDecodingFormat) {
109+
return Decoder.projectInsideDeserializer(
110+
context,
111+
(ProjectableDecodingFormat<DeserializationSchema<RowData>>) decodingFormat,
112+
tableDataType,
113+
dataTypeProjection,
114+
prefix,
115+
projectedFields,
116+
metadataSize);
117+
} else {
118+
return Decoder.projectAfterDeserializing(
119+
context,
120+
decodingFormat,
121+
tableDataType,
122+
dataTypeProjection,
123+
prefix,
124+
projectedFields,
125+
metadataSize);
126+
}
127+
}
128+
}
129+
130+
/** @return a {@link DeserializationSchema} or null. */
131+
@Nullable
132+
public DeserializationSchema<RowData> getDeserializationSchema() {
133+
return deserializationSchema;
134+
}
135+
136+
/** @return a {@link Projector}. */
137+
public Projector getProjector() {
138+
return projector;
139+
}
140+
141+
private static Decoder noDeserializationOrProjection() {
142+
return new Decoder(null, new ProjectorImpl(Map.of()));
143+
}
144+
145+
private static DataType toDataType(
146+
final DataType tableDataType,
147+
final int[] dataTypeProjection,
148+
final @Nullable String prefix) {
149+
final DataType temp = Projection.of(dataTypeProjection).project(tableDataType);
150+
return Optional.ofNullable(prefix)
151+
.map(s -> DataTypeUtils.stripRowPrefix(temp, s))
152+
.orElse(temp);
153+
}
154+
155+
private static Map<Integer, Integer> tableToDeserializedTopLevelPos(
156+
final int[] dataTypeProjection) {
157+
final HashMap<Integer, Integer> tableToDeserializedPos = new HashMap<>();
158+
for (int i = 0; i < dataTypeProjection.length; i++) {
159+
tableToDeserializedPos.put(dataTypeProjection[i], i);
160+
}
161+
return tableToDeserializedPos;
162+
}
163+
164+
private static int[] copyArray(final int[] arr) {
165+
return Arrays.copyOf(arr, arr.length);
166+
}
167+
168+
private static void addMetadataProjections(
169+
final int deserializedSize,
170+
final int physicalSize,
171+
final int metadataSize,
172+
final Map<List<Integer>, Integer> deserializedToProducedPos) {
173+
for (int i = 0; i < metadataSize; i++) {
174+
// metadata is always added to the end of the deserialized row by the DecodingFormat
175+
final int deserializedPos = deserializedSize + i;
176+
// we need to always add metadata to the end of the produced row
177+
final int producePos = physicalSize + i;
178+
deserializedToProducedPos.put(List.of(deserializedPos), producePos);
179+
}
180+
}
181+
182+
/**
183+
* This method generates a {@link Decoder} which pushes projections down directly into the
184+
* {@link ProjectableDecodingFormat} which takes care of projecting the fields during the
185+
* deserialization process itself.
186+
*/
187+
private static Decoder projectInsideDeserializer(
188+
final Context context,
189+
final ProjectableDecodingFormat<DeserializationSchema<RowData>>
190+
projectableDecodingFormat,
191+
final DataType tableDataType,
192+
final int[] dataTypeProjection,
193+
final @Nullable String prefix,
194+
final int[][] projectedFields,
195+
final int metadataSize) {
196+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
197+
tableToDeserializedTopLevelPos(dataTypeProjection);
198+
199+
final List<int[]> deserializerProjectedFields = new ArrayList<>();
200+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
201+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
202+
final int[] tablePos = projectedFields[producedPos];
203+
final int tableTopLevelPos = tablePos[0];
204+
205+
final Integer dataTypeTopLevelPos =
206+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
207+
if (dataTypeTopLevelPos != null) {
208+
final int[] dataTypePos = copyArray(tablePos);
209+
dataTypePos[0] = dataTypeTopLevelPos;
210+
211+
deserializerProjectedFields.add(dataTypePos);
212+
213+
final int deserializedPos = deserializerProjectedFields.size() - 1;
214+
deserializedToProducedPos.put(List.of(deserializedPos), producedPos);
215+
}
216+
}
217+
218+
addMetadataProjections(
219+
deserializerProjectedFields.size(),
220+
projectedFields.length,
221+
metadataSize,
222+
deserializedToProducedPos);
223+
224+
return new Decoder(
225+
projectableDecodingFormat.createRuntimeDecoder(
226+
context,
227+
toDataType(tableDataType, dataTypeProjection, prefix),
228+
deserializerProjectedFields.toArray(
229+
new int[deserializerProjectedFields.size()][])),
230+
new ProjectorImpl(deserializedToProducedPos));
231+
}
232+
233+
/**
234+
* This method generates a {@link Decoder} which deserializes the data fully using the {@link
235+
* DecodingFormat} and then applies any projections afterward.
236+
*/
237+
private static Decoder projectAfterDeserializing(
238+
final Context context,
239+
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
240+
final DataType tableDataType,
241+
final int[] dataTypeProjection,
242+
final @Nullable String prefix,
243+
final int[][] projectedFields,
244+
final int metadataSize) {
245+
final DataType dataType = toDataType(tableDataType, dataTypeProjection, prefix);
246+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
247+
tableToDeserializedTopLevelPos(dataTypeProjection);
248+
249+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
250+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
251+
final int[] tablePos = projectedFields[producedPos];
252+
int tableTopLevelPos = tablePos[0];
253+
254+
final Integer deserializedTopLevelPos =
255+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
256+
if (deserializedTopLevelPos != null) {
257+
final int[] deserializedPos = copyArray(tablePos);
258+
deserializedPos[0] = deserializedTopLevelPos;
259+
260+
deserializedToProducedPos.put(
261+
Arrays.stream(deserializedPos)
262+
.boxed()
263+
.collect(Collectors.toUnmodifiableList()),
264+
producedPos);
265+
}
266+
}
267+
268+
addMetadataProjections(
269+
dataTypeProjection.length,
270+
projectedFields.length,
271+
metadataSize,
272+
deserializedToProducedPos);
273+
274+
return new Decoder(
275+
decodingFormat.createRuntimeDecoder(context, dataType),
276+
new ProjectorImpl(deserializedToProducedPos));
277+
}
278+
279+
/** Projects fields from the deserialized row to their positions in the final produced row. */
280+
@Internal
281+
public interface Projector extends Serializable {
282+
/** Returns true if {@link #project} will not project any fields. */
283+
boolean isEmptyProjection();
284+
285+
/**
286+
* Returns true if deserialized positions are different from the final produced row
287+
* positions.
288+
*/
289+
boolean isProjectionNeeded();
290+
291+
/** Copies fields from the deserialized row to their final positions in the produced row. */
292+
void project(final RowData deserialized, final GenericRowData producedRow);
293+
}
294+
295+
private static class ProjectorImpl implements Projector {
296+
297+
private final Map<List<Integer>, Integer> deserializedToProducedPos;
298+
private final boolean isProjectionNeeded;
299+
300+
ProjectorImpl(final Map<List<Integer>, Integer> deserializedToProducedPos) {
301+
this.deserializedToProducedPos = deserializedToProducedPos;
302+
this.isProjectionNeeded =
303+
!deserializedToProducedPos.entrySet().stream()
304+
.allMatch(
305+
entry -> {
306+
final List<Integer> deserializedPos = entry.getKey();
307+
final List<Integer> producedPos = List.of(entry.getValue());
308+
return Objects.equals(producedPos, deserializedPos);
309+
});
310+
}
311+
312+
@Override
313+
public boolean isEmptyProjection() {
314+
return deserializedToProducedPos.isEmpty();
315+
}
316+
317+
@Override
318+
public boolean isProjectionNeeded() {
319+
return isProjectionNeeded;
320+
}
321+
322+
@Override
323+
public void project(final RowData deserialized, final GenericRowData producedRow) {
324+
this.deserializedToProducedPos.forEach(
325+
(deserializedPos, targetPos) -> {
326+
Object value = deserialized;
327+
for (final Integer i : deserializedPos) {
328+
value = ((GenericRowData) value).getField(i);
329+
}
330+
producedRow.setField(targetPos, value);
331+
});
332+
}
333+
}
334+
}

0 commit comments

Comments
 (0)