Skip to content

Commit c07a902

Browse files
committed
[hotfix] Introduce OperatorSnapshotFinalizer factory method
1 parent c80a1f3 commit c07a902

File tree

5 files changed

+17
-9
lines changed

5 files changed

+17
-9
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
7272
operator.snapshotState(checkpointId, timestamp, options, storage);
7373

7474
OperatorSubtaskState state =
75-
new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
75+
OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState();
7676

7777
operator.notifyCheckpointComplete(checkpointId);
7878
return new TaggedOperatorSubtaskState(index, state);

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@
2727
import org.apache.flink.runtime.state.SnapshotResult;
2828
import org.apache.flink.util.concurrent.FutureUtils;
2929

30-
import javax.annotation.Nonnull;
31-
3230
import java.util.concurrent.ExecutionException;
3331

3432
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
3533
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty;
34+
import static org.apache.flink.util.Preconditions.checkNotNull;
3635

3736
/**
3837
* This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link
@@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer {
4746
/** Secondary replica of the operator subtask state for faster, local recovery on TM. */
4847
private final OperatorSubtaskState taskLocalState;
4948

50-
public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFutures)
49+
public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures snapshotFutures)
5150
throws ExecutionException, InterruptedException {
51+
checkNotNull(snapshotFutures);
5252

5353
SnapshotResult<KeyedStateHandle> keyedManaged =
5454
FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
@@ -68,7 +68,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
6868
SnapshotResult<StateObjectCollection<OutputStateHandle>> resultSubpartition =
6969
snapshotFutures.getResultSubpartitionStateFuture().get();
7070

71-
jobManagerOwnedState =
71+
OperatorSubtaskState jobManagerOwnedState =
7272
OperatorSubtaskState.builder()
7373
.setManagedOperatorState(
7474
singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot()))
@@ -83,7 +83,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
8383
emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot()))
8484
.build();
8585

86-
taskLocalState =
86+
OperatorSubtaskState taskLocalState =
8787
OperatorSubtaskState.builder()
8888
.setManagedOperatorState(
8989
singletonOrEmpty(operatorManaged.getTaskLocalSnapshot()))
@@ -94,6 +94,14 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
9494
.setResultSubpartitionState(
9595
emptyIfNull(resultSubpartition.getTaskLocalSnapshot()))
9696
.build();
97+
98+
return new OperatorSnapshotFinalizer(jobManagerOwnedState, taskLocalState);
99+
}
100+
101+
public OperatorSnapshotFinalizer(
102+
OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState taskLocalState) {
103+
this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState);
104+
this.taskLocalState = checkNotNull(taskLocalState);
97105
}
98106

99107
public OperatorSubtaskState getTaskLocalState() {

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception
189189

190190
// finalize the async part of all by executing all snapshot runnables
191191
OperatorSnapshotFinalizer finalizedSnapshots =
192-
new OperatorSnapshotFinalizer(snapshotInProgress);
192+
OperatorSnapshotFinalizer.create(snapshotInProgress);
193193

194194
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
195195
operatorID, finalizedSnapshots.getJobManagerOwnedState());

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void testRunAndExtract() throws Exception {
9393
assertThat(f).isNotDone();
9494
}
9595

96-
OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures);
96+
OperatorSnapshotFinalizer finalizer = OperatorSnapshotFinalizer.create(snapshotFutures);
9797

9898
for (Future<?> f : snapshotFutures.getAllFutures()) {
9999
assertThat(f).isDone();

flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ public OperatorSnapshotFinalizer snapshotWithLocalState(
761761
checkpointStorageAccess.resolveCheckpointStorageLocation(
762762
checkpointId, locationReference));
763763

764-
return new OperatorSnapshotFinalizer(operatorStateResult);
764+
return OperatorSnapshotFinalizer.create(operatorStateResult);
765765
}
766766

767767
/**

0 commit comments

Comments
 (0)