Skip to content

Commit dcde1ba

Browse files
committed
[FLINK-38483][checkpoint] Fix the bug that Job cannot be recovered from unaligned checkpoint due to Cannot get old subtasks from a descriptor that represents no state. exception
1 parent b8613ec commit dcde1ba

File tree

3 files changed

+65
-22
lines changed

3 files changed

+65
-22
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,20 @@ public static class InflightDataGateOrPartitionRescalingDescriptor implements Se
116116
public static final InflightDataGateOrPartitionRescalingDescriptor NO_STATE =
117117
new InflightDataGateOrPartitionRescalingDescriptor(
118118
new int[0],
119-
RescaleMappings.identity(0, 0),
119+
RescaleMappings.SYMMETRIC_IDENTITY,
120120
Collections.emptySet(),
121121
MappingType.IDENTITY) {
122122

123123
private static final long serialVersionUID = 1L;
124124

125125
@Override
126126
public int[] getOldSubtaskInstances() {
127-
throw new UnsupportedOperationException(
128-
"Cannot get old subtasks from a descriptor that represents no state.");
127+
return new int[0];
129128
}
130129

131130
@Override
132131
public RescaleMappings getRescaleMappings() {
133-
throw new UnsupportedOperationException(
134-
"Cannot get rescale mappings from a descriptor that represents no state.");
132+
return RescaleMappings.SYMMETRIC_IDENTITY;
135133
}
136134
};
137135

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,25 @@
2727
import java.util.Collections;
2828

2929
import static org.assertj.core.api.Assertions.assertThat;
30-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3130

3231
/** Tests for {@link InflightDataRescalingDescriptor}. */
3332
class InflightDataRescalingDescriptorTest {
3433

3534
@Test
36-
void testNoStateDescriptorThrowsOnGetOldSubtaskInstances() {
35+
void testNoStateDescriptorReturnsEmptyOldSubtaskInstances() {
3736
InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
3837
InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
3938

40-
assertThatThrownBy(noStateDescriptor::getOldSubtaskInstances)
41-
.isInstanceOf(UnsupportedOperationException.class)
42-
.hasMessageContaining(
43-
"Cannot get old subtasks from a descriptor that represents no state");
39+
assertThat(noStateDescriptor.getOldSubtaskInstances()).isEqualTo(new int[0]);
4440
}
4541

4642
@Test
47-
void testNoStateDescriptorThrowsOnGetRescaleMappings() {
43+
void testNoStateDescriptorReturnsSymmetricIdentity() {
4844
InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
4945
InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
5046

51-
assertThatThrownBy(noStateDescriptor::getRescaleMappings)
52-
.isInstanceOf(UnsupportedOperationException.class)
53-
.hasMessageContaining(
54-
"Cannot get rescale mappings from a descriptor that represents no state");
47+
assertThat(noStateDescriptor.getRescaleMappings())
48+
.isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
5549
}
5650

5751
@Test
@@ -108,11 +102,10 @@ void testInflightDataRescalingDescriptorWithNoStateDescriptor() {
108102
InflightDataRescalingDescriptor rescalingDescriptor =
109103
new InflightDataRescalingDescriptor(descriptors);
110104

111-
// First gate/partition has NO_STATE
112-
assertThatThrownBy(() -> rescalingDescriptor.getOldSubtaskIndexes(0))
113-
.isInstanceOf(UnsupportedOperationException.class);
114-
assertThatThrownBy(() -> rescalingDescriptor.getChannelMapping(0))
115-
.isInstanceOf(UnsupportedOperationException.class);
105+
// First gate/partition has NO_STATE - should return empty array and SYMMETRIC_IDENTITY
106+
assertThat(rescalingDescriptor.getOldSubtaskIndexes(0)).isEqualTo(new int[0]);
107+
assertThat(rescalingDescriptor.getChannelMapping(0))
108+
.isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
116109

117110
// Second gate/partition has normal state
118111
assertThat(rescalingDescriptor.getOldSubtaskIndexes(1)).isEqualTo(new int[] {0, 1});

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.configuration.Configuration;
2828
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
2929
import org.apache.flink.configuration.MemorySize;
30+
import org.apache.flink.configuration.RestartStrategyOptions;
3031
import org.apache.flink.configuration.StateRecoveryOptions;
3132
import org.apache.flink.configuration.TaskManagerOptions;
3233
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -57,6 +58,8 @@
5758
import java.util.List;
5859
import java.util.Random;
5960

61+
import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY;
62+
6063
/**
6164
* Integration test for rescaling jobs with mixed (UC-supported and UC-unsupported) exchanges from
6265
* an unaligned checkpoint.
@@ -80,7 +83,8 @@ public static Collection<ExecuteJobViaEnv> parameter() {
8083
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiOutputDAG,
8184
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiInputDAG,
8285
UnalignedCheckpointRescaleWithMixedExchangesITCase::createRescalePartitionerDAG,
83-
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG);
86+
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG,
87+
UnalignedCheckpointRescaleWithMixedExchangesITCase::createPartEmptyHashExchangeDAG);
8488
}
8589

8690
@Before
@@ -137,6 +141,7 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
137141
conf.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1));
138142
// Disable aligned timeout to ensure it works with unaligned checkpoint directly
139143
conf.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofSeconds(0));
144+
conf.set(RestartStrategyOptions.RESTART_STRATEGY, NO_RESTART_STRATEGY.getMainValue());
140145
conf.set(
141146
CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
142147
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
@@ -336,6 +341,53 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
336341
return env.executeAsync();
337342
}
338343

344+
/**
345+
* Creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges:
346+
* one with actual data and one that is empty due to filtering. This tests unaligned checkpoint
347+
* rescaling with mixed empty and non-empty hash partitions.
348+
*/
349+
private static JobClient createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
350+
throws Exception {
351+
int source1Parallelism = getRandomParallelism();
352+
DataGeneratorSource<Long> source1 =
353+
new DataGeneratorSource<>(
354+
index -> index,
355+
Long.MAX_VALUE,
356+
RateLimiterStrategy.perSecond(5000),
357+
Types.LONG);
358+
DataStream<Long> sourceStream1 =
359+
env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Source 1")
360+
.setParallelism(source1Parallelism);
361+
362+
int source2Parallelism = getRandomParallelism();
363+
DataGeneratorSource<Long> source2 =
364+
new DataGeneratorSource<>(
365+
index -> index,
366+
Long.MAX_VALUE,
367+
RateLimiterStrategy.perSecond(5000),
368+
Types.LONG);
369+
370+
// Filter all records to simulate empty state exchange
371+
DataStream<Long> sourceStream2 =
372+
env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Source 2")
373+
.setParallelism(source2Parallelism)
374+
.filter(value -> false)
375+
.setParallelism(source2Parallelism);
376+
377+
sourceStream1
378+
.union(sourceStream2)
379+
.keyBy((KeySelector<Long, Long>) value -> value)
380+
.map(
381+
x -> {
382+
Thread.sleep(5);
383+
return x;
384+
})
385+
.name("MapAfterKeyBy")
386+
.setParallelism(getRandomParallelism());
387+
388+
return env.executeAsync();
389+
}
390+
339391
private static int getRandomParallelism() {
340392
return RANDOM.nextInt(MAX_SLOTS) + 1;
341393
}

0 commit comments

Comments
 (0)