Skip to content

Commit 4d35682

Browse files
Flink: Backport apache#11662 Fix range distribution npe when value is null to Flink 1.18 and 1.19 (apache#11745)
1 parent 3b00043 commit 4d35682

22 files changed

+651
-31
lines changed

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,21 @@ boolean isEmpty() {
108108
return keyFrequency().isEmpty();
109109
}
110110
}
111+
112+
boolean isValid() {
113+
if (type == StatisticsType.Sketch) {
114+
if (null == keySamples) {
115+
return false;
116+
}
117+
} else {
118+
if (null == keyFrequency()) {
119+
return false;
120+
}
121+
if (keyFrequency().values().contains(null)) {
122+
return false;
123+
}
124+
}
125+
126+
return true;
127+
}
111128
}

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ class CompletedStatisticsSerializer extends TypeSerializer<CompletedStatistics>
4848
this.keySamplesSerializer = new ListSerializer<>(sortKeySerializer);
4949
}
5050

51+
public void changeSortKeySerializerVersion(int version) {
52+
if (sortKeySerializer instanceof SortKeySerializer) {
53+
((SortKeySerializer) sortKeySerializer).setVersion(version);
54+
}
55+
}
56+
57+
public void changeSortKeySerializerVersionLatest() {
58+
if (sortKeySerializer instanceof SortKeySerializer) {
59+
((SortKeySerializer) sortKeySerializer).restoreToLatestVersion();
60+
}
61+
}
62+
5163
@Override
5264
public boolean isImmutableType() {
5365
return false;

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
370370
"Restoring data statistic coordinator {} from checkpoint {}", operatorName, checkpointId);
371371
this.completedStatistics =
372372
StatisticsUtil.deserializeCompletedStatistics(
373-
checkpointData, completedStatisticsSerializer);
373+
checkpointData, (CompletedStatisticsSerializer) completedStatisticsSerializer);
374+
374375
// recompute global statistics in case downstream parallelism changed
375376
this.globalStatistics =
376377
globalStatistics(

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,12 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
5353
private final int size;
5454
private final Types.NestedField[] transformedFields;
5555

56+
private int version;
57+
5658
private transient SortKey sortKey;
5759

58-
SortKeySerializer(Schema schema, SortOrder sortOrder) {
60+
SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
61+
this.version = version;
5962
this.schema = schema;
6063
this.sortOrder = sortOrder;
6164
this.size = sortOrder.fields().size();
@@ -76,6 +79,10 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
7679
}
7780
}
7881

82+
SortKeySerializer(Schema schema, SortOrder sortOrder) {
83+
this(schema, sortOrder, SortKeySerializerSnapshot.CURRENT_VERSION);
84+
}
85+
7986
private SortKey lazySortKey() {
8087
if (sortKey == null) {
8188
this.sortKey = new SortKey(schema, sortOrder);
@@ -84,6 +91,18 @@ private SortKey lazySortKey() {
8491
return sortKey;
8592
}
8693

94+
public int getLatestVersion() {
95+
return snapshotConfiguration().getCurrentVersion();
96+
}
97+
98+
public void restoreToLatestVersion() {
99+
this.version = snapshotConfiguration().getCurrentVersion();
100+
}
101+
102+
public void setVersion(int version) {
103+
this.version = version;
104+
}
105+
87106
@Override
88107
public boolean isImmutableType() {
89108
return false;
@@ -125,6 +144,16 @@ public void serialize(SortKey record, DataOutputView target) throws IOException
125144
for (int i = 0; i < size; ++i) {
126145
int fieldId = transformedFields[i].fieldId();
127146
Type.TypeID typeId = transformedFields[i].type().typeId();
147+
if (version > 1) {
148+
Object value = record.get(i, Object.class);
149+
if (value == null) {
150+
target.writeBoolean(true);
151+
continue;
152+
} else {
153+
target.writeBoolean(false);
154+
}
155+
}
156+
128157
switch (typeId) {
129158
case BOOLEAN:
130159
target.writeBoolean(record.get(i, Boolean.class));
@@ -193,6 +222,14 @@ public SortKey deserialize(SortKey reuse, DataInputView source) throws IOExcepti
193222
reuse.size(),
194223
size);
195224
for (int i = 0; i < size; ++i) {
225+
if (version > 1) {
226+
boolean isNull = source.readBoolean();
227+
if (isNull) {
228+
reuse.set(i, null);
229+
continue;
230+
}
231+
}
232+
196233
int fieldId = transformedFields[i].fieldId();
197234
Type.TypeID typeId = transformedFields[i].type().typeId();
198235
switch (typeId) {
@@ -277,11 +314,13 @@ public TypeSerializerSnapshot<SortKey> snapshotConfiguration() {
277314
}
278315

279316
public static class SortKeySerializerSnapshot implements TypeSerializerSnapshot<SortKey> {
280-
private static final int CURRENT_VERSION = 1;
317+
private static final int CURRENT_VERSION = 2;
281318

282319
private Schema schema;
283320
private SortOrder sortOrder;
284321

322+
private int version = CURRENT_VERSION;
323+
285324
/** Constructor for read instantiation. */
286325
@SuppressWarnings({"unused", "checkstyle:RedundantModifier"})
287326
public SortKeySerializerSnapshot() {
@@ -311,10 +350,16 @@ public void writeSnapshot(DataOutputView out) throws IOException {
311350
@Override
312351
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
313352
throws IOException {
314-
if (readVersion == 1) {
315-
readV1(in);
316-
} else {
317-
throw new IllegalArgumentException("Unknown read version: " + readVersion);
353+
switch (readVersion) {
354+
case 1:
355+
read(in);
356+
this.version = 1;
357+
break;
358+
case 2:
359+
read(in);
360+
break;
361+
default:
362+
throw new IllegalArgumentException("Unknown read version: " + readVersion);
318363
}
319364
}
320365

@@ -325,9 +370,13 @@ public TypeSerializerSchemaCompatibility<SortKey> resolveSchemaCompatibility(
325370
return TypeSerializerSchemaCompatibility.incompatible();
326371
}
327372

328-
// Sort order should be identical
329373
SortKeySerializerSnapshot newSnapshot =
330374
(SortKeySerializerSnapshot) newSerializer.snapshotConfiguration();
375+
if (newSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) {
376+
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
377+
}
378+
379+
// Sort order should be identical
331380
if (!sortOrder.sameOrder(newSnapshot.sortOrder)) {
332381
return TypeSerializerSchemaCompatibility.incompatible();
333382
}
@@ -351,10 +400,10 @@ public TypeSerializerSchemaCompatibility<SortKey> resolveSchemaCompatibility(
351400
public TypeSerializer<SortKey> restoreSerializer() {
352401
Preconditions.checkState(schema != null, "Invalid schema: null");
353402
Preconditions.checkState(sortOrder != null, "Invalid sort order: null");
354-
return new SortKeySerializer(schema, sortOrder);
403+
return new SortKeySerializer(schema, sortOrder, version);
355404
}
356405

357-
private void readV1(DataInputView in) throws IOException {
406+
private void read(DataInputView in) throws IOException {
358407
String schemaJson = StringUtils.readString(in);
359408
String sortOrderJson = StringUtils.readString(in);
360409
this.schema = SchemaParser.fromJson(schemaJson);

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,29 @@ static byte[] serializeCompletedStatistics(
7373
}
7474

7575
static CompletedStatistics deserializeCompletedStatistics(
76-
byte[] bytes, TypeSerializer<CompletedStatistics> statisticsSerializer) {
76+
byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
7777
try {
7878
DataInputDeserializer input = new DataInputDeserializer(bytes);
79-
return statisticsSerializer.deserialize(input);
80-
} catch (IOException e) {
81-
throw new UncheckedIOException("Fail to deserialize aggregated statistics", e);
79+
CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input);
80+
if (!completedStatistics.isValid()) {
81+
throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1");
82+
}
83+
84+
return completedStatistics;
85+
} catch (Exception e) {
86+
try {
87+
// If we restore from a lower version, the new version of SortKeySerializer cannot correctly
88+
// parse the checkpointData, so we need to first switch the version to v1. Once the state
89+
// data is successfully parsed, we need to switch the serialization version to the latest
90+
// version to parse the subsequent data passed from the TM.
91+
statisticsSerializer.changeSortKeySerializerVersion(1);
92+
DataInputDeserializer input = new DataInputDeserializer(bytes);
93+
CompletedStatistics deserialize = statisticsSerializer.deserialize(input);
94+
statisticsSerializer.changeSortKeySerializerVersionLatest();
95+
return deserialize;
96+
} catch (IOException ioException) {
97+
throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException);
98+
}
8299
}
83100
}
84101

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.iceberg.flink.TestFixtures;
4747
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
4848
import org.apache.iceberg.flink.source.BoundedTestSource;
49+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
4950
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
5051
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5152
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -252,6 +253,44 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception
252253
assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
253254
}
254255

256+
@TestTemplate
257+
public void testRangeDistributionWithNullValue() throws Exception {
258+
assumeThat(partitioned).isTrue();
259+
260+
table
261+
.updateProperties()
262+
.set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName())
263+
.commit();
264+
265+
int numOfCheckpoints = 6;
266+
List<List<Row>> charRows = createCharRows(numOfCheckpoints, 10);
267+
charRows.add(ImmutableList.of(Row.of(1, null)));
268+
DataStream<Row> dataStream =
269+
env.addSource(createRangeDistributionBoundedSource(charRows), ROW_TYPE_INFO);
270+
FlinkSink.Builder builder =
271+
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
272+
.table(table)
273+
.tableLoader(tableLoader)
274+
.writeParallelism(parallelism);
275+
276+
// sort based on partition columns
277+
builder.append();
278+
env.execute(getClass().getSimpleName());
279+
280+
table.refresh();
281+
// ordered in reverse timeline from the newest snapshot to the oldest snapshot
282+
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots().iterator());
283+
// only keep the snapshots with added data files
284+
snapshots =
285+
snapshots.stream()
286+
.filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext())
287+
.collect(Collectors.toList());
288+
289+
// Sometimes we will have more checkpoints than the bounded source if we pass the
290+
// auto checkpoint interval. Thus producing multiple snapshots.
291+
assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
292+
}
293+
255294
@TestTemplate
256295
public void testRangeDistributionWithSortOrder() throws Exception {
257296
table

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.iceberg.flink.sink.shuffle;
2020

2121
import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS;
22+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
2223

2324
import org.apache.flink.api.common.typeutils.SerializerTestBase;
2425
import org.apache.flink.api.common.typeutils.TypeSerializer;
26+
import org.apache.flink.core.memory.DataInputDeserializer;
27+
import org.apache.flink.core.memory.DataOutputSerializer;
2528
import org.apache.iceberg.SortKey;
2629
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
30+
import org.junit.jupiter.api.Test;
2731

2832
public class TestCompletedStatisticsSerializer extends SerializerTestBase<CompletedStatistics> {
2933

@@ -51,4 +55,49 @@ protected CompletedStatistics[] getTestData() {
5155
CompletedStatistics.fromKeySamples(2L, new SortKey[] {CHAR_KEYS.get("a"), CHAR_KEYS.get("b")})
5256
};
5357
}
58+
59+
@Test
60+
public void testSerializer() throws Exception {
61+
TypeSerializer<CompletedStatistics> completedStatisticsTypeSerializer = createSerializer();
62+
CompletedStatistics[] data = getTestData();
63+
DataOutputSerializer output = new DataOutputSerializer(1024);
64+
completedStatisticsTypeSerializer.serialize(data[0], output);
65+
byte[] serializedBytes = output.getCopyOfBuffer();
66+
67+
DataInputDeserializer input = new DataInputDeserializer(serializedBytes);
68+
CompletedStatistics deserialized = completedStatisticsTypeSerializer.deserialize(input);
69+
assertThat(deserialized).isEqualTo(data[0]);
70+
}
71+
72+
@Test
73+
public void testRestoreOldVersionSerializer() throws Exception {
74+
CompletedStatisticsSerializer completedStatisticsTypeSerializer =
75+
(CompletedStatisticsSerializer) createSerializer();
76+
completedStatisticsTypeSerializer.changeSortKeySerializerVersion(1);
77+
CompletedStatistics[] data = getTestData();
78+
DataOutputSerializer output = new DataOutputSerializer(1024);
79+
completedStatisticsTypeSerializer.serialize(data[0], output);
80+
byte[] serializedBytes = output.getCopyOfBuffer();
81+
82+
completedStatisticsTypeSerializer.changeSortKeySerializerVersionLatest();
83+
CompletedStatistics completedStatistics =
84+
StatisticsUtil.deserializeCompletedStatistics(
85+
serializedBytes, completedStatisticsTypeSerializer);
86+
assertThat(completedStatistics).isEqualTo(data[0]);
87+
}
88+
89+
@Test
90+
public void testRestoreNewSerializer() throws Exception {
91+
CompletedStatisticsSerializer completedStatisticsTypeSerializer =
92+
(CompletedStatisticsSerializer) createSerializer();
93+
CompletedStatistics[] data = getTestData();
94+
DataOutputSerializer output = new DataOutputSerializer(1024);
95+
completedStatisticsTypeSerializer.serialize(data[0], output);
96+
byte[] serializedBytes = output.getCopyOfBuffer();
97+
98+
CompletedStatistics completedStatistics =
99+
StatisticsUtil.deserializeCompletedStatistics(
100+
serializedBytes, completedStatisticsTypeSerializer);
101+
assertThat(completedStatistics).isEqualTo(data[0]);
102+
}
54103
}

0 commit comments

Comments
 (0)