Skip to content

Conversation

1996fanrui
Copy link
Member

@1996fanrui 1996fanrui commented Oct 6, 2025

What is the purpose of the change

Job cannot be recovered from unaligned checkpoint , exception: Cannot get old subtasks from a descriptor that represents no state.

[FLINK-38483][checkpoint] Fix the bug that Job cannot be recovered from unaligned checkpoint due to Cannot get old subtasks from a descriptor that represents no state. exception

Root Cause Analysis

Technical Background

The issue stems from the NO_STATE descriptor implementation in InflightDataRescalingDescriptor. When processing unaligned checkpoints during rescaling, the system encounters gates or partitions that have no inflight data. The NO_STATE descriptor was designed to represent these empty states.

The Problem

The original NO_STATE descriptor implementation threw UnsupportedOperationException for both getOldSubtaskInstances() and getRescaleMappings() methods:

@Override
public int[] getOldSubtaskInstances() {
    throw new UnsupportedOperationException(
        "Cannot get old subtasks from a descriptor that represents no state.");
}

@Override
public RescaleMappings getRescaleMappings() {
    throw new UnsupportedOperationException(
        "Cannot get rescale mappings from a descriptor that represents no state.");
}

This design choice was problematic because: Rescaling Logic Expects Values. Such as: jobs have mixed hash exchanges where some partitions are empty (due to filtering, low data volume, or scaling effects) while others contain data.

Reproduction Case

The issue can be consistently reproduced using: org.apache.flink.test.checkpointing.UnalignedCheckpointRescaleWithMixedExchangesITCase.createPartEmptyHashExchangeDAG.

This test creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges: one with actual data and one that is empty due to filtering.

Solution Implementation

The Fix

The solution replaces the exception-throwing behavior with safe default values:

@Override
public int[] getOldSubtaskInstances() {
    return new int[0];  // Return empty array instead of throwing
}

@Override
public RescaleMappings getRescaleMappings() {
    return RescaleMappings.SYMMETRIC_IDENTITY;  // Return identity mapping
}

Why This Solution Is Risk-Free

  1. Semantic Correctness:

    • Empty array for getOldSubtaskInstances() correctly represents "no old subtasks"
    • SYMMETRIC_IDENTITY mapping correctly represents "no rescaling needed"
  2. No State Guarantee: The NO_STATE descriptor is only used when there is genuinely no inflight data to process. Therefore:

    • No data will be lost
    • No incorrect mappings will be applied
    • The empty values accurately reflect the absence of state
  3. Existing Pattern: This approach maintains consistency with other parts of the codebase

Consistency with Existing Code

The solution aligns with the existing NoRescalingDescriptor implementation, which already uses the same pattern:

// NoRescalingDescriptor.getOldSubtaskIndexes()
@Override
public int[] getOldSubtaskIndexes(int gateOrPartitionIndex) {
    return new int[0];  // Returns empty array
}

// NoRescalingDescriptor.getChannelMapping()
@Override
public RescaleMappings getChannelMapping(int gateOrPartitionIndex) {
    return RescaleMappings.SYMMETRIC_IDENTITY;  // Returns identity mapping
}

This consistency ensures that:

  • The codebase follows a uniform pattern for handling "no state" scenarios
  • Maintenance is simplified
  • Behavior is predictable across similar components

Verifying this change

  • Introducing UnalignedCheckpointRescaleWithMixedExchangesITCase.createPartEmptyHashExchangeDAG.
  • Updating InflightDataRescalingDescriptorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 6, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi
Copy link
Contributor

@flinkbot run azure

@gaborgsomogyi
Copy link
Contributor

I'm just prepping 2.1.1 release and seems like it would be good to punt in before cut. I'm going to take a look tomorrow morning...

public int[] getOldSubtaskInstances() {
throw new UnsupportedOperationException(
"Cannot get old subtasks from a descriptor that represents no state.");
return new int[0];
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can eliminate some GC:

private static final int[] EMPTY_INT_ARRAY = new int[0];

@Override
public int[] getOldSubtaskInstances() {
    return EMPTY_INT_ARRAY;
}

…om unaligned checkpoint due to `Cannot get old subtasks from a descriptor that represents no state.` exception
Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just prepping 2.1.1 release and seems like it would be good to punt in before cut. I'm going to take a look tomorrow morning...

Thanks for driving release, and it make sense to include this.

I have address your comment.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@gaborgsomogyi
Copy link
Contributor

Hmmm, still some unrelated issues in azure pipe

@gaborgsomogyi
Copy link
Contributor

@flinkbot run azure

@gaborgsomogyi
Copy link
Contributor

WordCount is definitely not using unaligned checkpoint so most probably we just need to kick the pipe trough the flaky tests

@1996fanrui
Copy link
Member Author

2 issues about CI:

  1. Ci is unstable on master and other release branch
  2. @flinkbot run azure does not work, I always have to re-push to trigger the CI

@gaborgsomogyi
Copy link
Contributor

After green CI good to go🚢

@weiqingy
Copy link

weiqingy commented Oct 7, 2025

Hey @1996fanrui @gaborgsomogyi, do you know the latest on the CI failures? I’m trying to understand how to avoid failures that don’t seem related to the PR, and how to tell if they’re caused by flaky tests. In my case, the CI failed but I don’t see any test failures: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=70091&view=ms.vss-test-web.build-test-results-tab”

@1996fanrui
Copy link
Member Author

It may be caused by test environment. I checked the CI list[1], no CI is green since 20251003.3.

I saw lot of panic: runtime error: invalid memory address or nil pointer dereference from the recent failed CIs, but do not know the root cause.

[1] https://dev.azure.com/apache-flink/apache-flink/_build?definitionId=2&_a=summary

@weiqingy
Copy link

weiqingy commented Oct 7, 2025

Thanks for the info @1996fanrui !

Copy link
Contributor

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve assuming the CI errors are unrelated.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 8, 2025
@gaborgsomogyi
Copy link
Contributor

FYI, I've a try to fix it on master: #27095

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants