Skip to content

Commit b502a88

Browse files
committed
[FLINK-38483][checkpoint] Fix the bug that Job cannot be recovered from unaligned checkpoint due to Cannot get old subtasks from a descriptor that represents no state. exception
1 parent 4a3e60d commit b502a88

File tree

3 files changed

+69
-24
lines changed

3 files changed

+69
-24
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
*/
3434
public class InflightDataRescalingDescriptor implements Serializable {
3535

36+
private static final int[] EMPTY_INT_ARRAY = new int[0];
37+
3638
public static final InflightDataRescalingDescriptor NO_RESCALE = new NoRescalingDescriptor();
3739

3840
private static final long serialVersionUID = -3396674344669796295L;
@@ -115,23 +117,21 @@ public static class InflightDataGateOrPartitionRescalingDescriptor implements Se
115117

116118
public static final InflightDataGateOrPartitionRescalingDescriptor NO_STATE =
117119
new InflightDataGateOrPartitionRescalingDescriptor(
118-
new int[0],
119-
RescaleMappings.identity(0, 0),
120+
EMPTY_INT_ARRAY,
121+
RescaleMappings.SYMMETRIC_IDENTITY,
120122
Collections.emptySet(),
121123
MappingType.IDENTITY) {
122124

123125
private static final long serialVersionUID = 1L;
124126

125127
@Override
126128
public int[] getOldSubtaskInstances() {
127-
throw new UnsupportedOperationException(
128-
"Cannot get old subtasks from a descriptor that represents no state.");
129+
return EMPTY_INT_ARRAY;
129130
}
130131

131132
@Override
132133
public RescaleMappings getRescaleMappings() {
133-
throw new UnsupportedOperationException(
134-
"Cannot get rescale mappings from a descriptor that represents no state.");
134+
return RescaleMappings.SYMMETRIC_IDENTITY;
135135
}
136136
};
137137

@@ -228,7 +228,7 @@ public NoRescalingDescriptor() {
228228

229229
@Override
230230
public int[] getOldSubtaskIndexes(int gateOrPartitionIndex) {
231-
return new int[0];
231+
return EMPTY_INT_ARRAY;
232232
}
233233

234234
@Override

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,25 @@
2727
import java.util.Collections;
2828

2929
import static org.assertj.core.api.Assertions.assertThat;
30-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3130

3231
/** Tests for {@link InflightDataRescalingDescriptor}. */
3332
class InflightDataRescalingDescriptorTest {
3433

3534
@Test
36-
void testNoStateDescriptorThrowsOnGetOldSubtaskInstances() {
35+
void testNoStateDescriptorReturnsEmptyOldSubtaskInstances() {
3736
InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
3837
InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
3938

40-
assertThatThrownBy(noStateDescriptor::getOldSubtaskInstances)
41-
.isInstanceOf(UnsupportedOperationException.class)
42-
.hasMessageContaining(
43-
"Cannot get old subtasks from a descriptor that represents no state");
39+
assertThat(noStateDescriptor.getOldSubtaskInstances()).isEqualTo(new int[0]);
4440
}
4541

4642
@Test
47-
void testNoStateDescriptorThrowsOnGetRescaleMappings() {
43+
void testNoStateDescriptorReturnsSymmetricIdentity() {
4844
InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
4945
InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
5046

51-
assertThatThrownBy(noStateDescriptor::getRescaleMappings)
52-
.isInstanceOf(UnsupportedOperationException.class)
53-
.hasMessageContaining(
54-
"Cannot get rescale mappings from a descriptor that represents no state");
47+
assertThat(noStateDescriptor.getRescaleMappings())
48+
.isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
5549
}
5650

5751
@Test
@@ -108,11 +102,10 @@ void testInflightDataRescalingDescriptorWithNoStateDescriptor() {
108102
InflightDataRescalingDescriptor rescalingDescriptor =
109103
new InflightDataRescalingDescriptor(descriptors);
110104

111-
// First gate/partition has NO_STATE
112-
assertThatThrownBy(() -> rescalingDescriptor.getOldSubtaskIndexes(0))
113-
.isInstanceOf(UnsupportedOperationException.class);
114-
assertThatThrownBy(() -> rescalingDescriptor.getChannelMapping(0))
115-
.isInstanceOf(UnsupportedOperationException.class);
105+
// First gate/partition has NO_STATE - should return empty array and SYMMETRIC_IDENTITY
106+
assertThat(rescalingDescriptor.getOldSubtaskIndexes(0)).isEqualTo(new int[0]);
107+
assertThat(rescalingDescriptor.getChannelMapping(0))
108+
.isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
116109

117110
// Second gate/partition has normal state
118111
assertThat(rescalingDescriptor.getOldSubtaskIndexes(1)).isEqualTo(new int[] {0, 1});

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.configuration.Configuration;
2828
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
2929
import org.apache.flink.configuration.MemorySize;
30+
import org.apache.flink.configuration.RestartStrategyOptions;
3031
import org.apache.flink.configuration.StateRecoveryOptions;
3132
import org.apache.flink.configuration.TaskManagerOptions;
3233
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -58,6 +59,8 @@
5859
import java.util.Collections;
5960
import java.util.Random;
6061

62+
import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY;
63+
6164
/**
6265
* Integration test for rescaling jobs with mixed (UC-supported and UC-unsupported) exchanges from
6366
* an unaligned checkpoint.
@@ -81,7 +84,8 @@ public static Collection<ExecuteJobViaEnv> parameter() {
8184
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiOutputDAG,
8285
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiInputDAG,
8386
UnalignedCheckpointRescaleWithMixedExchangesITCase::createRescalePartitionerDAG,
84-
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG);
87+
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG,
88+
UnalignedCheckpointRescaleWithMixedExchangesITCase::createPartEmptyHashExchangeDAG);
8589
}
8690

8791
@Before
@@ -138,6 +142,7 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
138142
conf.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1));
139143
// Disable aligned timeout to ensure it works with unaligned checkpoint directly
140144
conf.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofSeconds(0));
145+
conf.set(RestartStrategyOptions.RESTART_STRATEGY, NO_RESTART_STRATEGY.getMainValue());
141146
conf.set(
142147
CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
143148
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
@@ -337,6 +342,53 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
337342
return env.executeAsync();
338343
}
339344

345+
/**
346+
* Creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges:
347+
* one with actual data and one that is empty due to filtering. This tests unaligned checkpoint
348+
* rescaling with mixed empty and non-empty hash partitions.
349+
*/
350+
private static JobClient createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
351+
throws Exception {
352+
int source1Parallelism = getRandomParallelism();
353+
DataGeneratorSource<Long> source1 =
354+
new DataGeneratorSource<>(
355+
index -> index,
356+
Long.MAX_VALUE,
357+
RateLimiterStrategy.perSecond(5000),
358+
Types.LONG);
359+
DataStream<Long> sourceStream1 =
360+
env.fromSource(source1, WatermarkStrategy.noWatermarks(), "Source 1")
361+
.setParallelism(source1Parallelism);
362+
363+
int source2Parallelism = getRandomParallelism();
364+
DataGeneratorSource<Long> source2 =
365+
new DataGeneratorSource<>(
366+
index -> index,
367+
Long.MAX_VALUE,
368+
RateLimiterStrategy.perSecond(5000),
369+
Types.LONG);
370+
371+
// Filter all records to simulate empty state exchange
372+
DataStream<Long> sourceStream2 =
373+
env.fromSource(source2, WatermarkStrategy.noWatermarks(), "Source 2")
374+
.setParallelism(source2Parallelism)
375+
.filter(value -> false)
376+
.setParallelism(source2Parallelism);
377+
378+
sourceStream1
379+
.union(sourceStream2)
380+
.keyBy((KeySelector<Long, Long>) value -> value)
381+
.map(
382+
x -> {
383+
Thread.sleep(5);
384+
return x;
385+
})
386+
.name("MapAfterKeyBy")
387+
.setParallelism(getRandomParallelism());
388+
389+
return env.executeAsync();
390+
}
391+
340392
private static int getRandomParallelism() {
341393
return RANDOM.nextInt(MAX_SLOTS) + 1;
342394
}

0 commit comments

Comments
 (0)