Skip to content

Commit e107aad

Browse files
authored
[FLINK-38327] Use vertex id instead of operator id in checkpoint file-merging manager (#26973)
(cherry picked from commit ee73d0f)
1 parent 2af372b commit e107aad

File tree

4 files changed

+70
-45
lines changed

4 files changed

+70
-45
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.core.fs.FileSystem;
2424
import org.apache.flink.core.fs.Path;
2525
import org.apache.flink.runtime.execution.Environment;
26-
import org.apache.flink.runtime.jobgraph.OperatorID;
26+
import org.apache.flink.runtime.jobgraph.JobVertexID;
2727
import org.apache.flink.runtime.state.CheckpointedStateScope;
2828
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
2929
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -209,14 +209,14 @@ void restoreStateHandles(
209209
long checkpointId, SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles);
210210

211211
/**
212-
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
212+
* A key identifies a subtask. A subtask can be identified by the vertex id, subtask index and
213213
* the parallelism. Note that this key should be consistent across job attempts.
214214
*/
215215
final class SubtaskKey {
216216
private static final String MANAGED_DIR_FORMAT = "job_%s_op_%s_%d_%d";
217217

218218
final String jobIDString;
219-
final String operatorIDString;
219+
final String vertexIDString;
220220
final int subtaskIndex;
221221
final int parallelism;
222222

@@ -226,23 +226,23 @@ final class SubtaskKey {
226226
*/
227227
final int hashCode;
228228

229-
public SubtaskKey(JobID jobID, OperatorID operatorID, TaskInfo taskInfo) {
229+
public SubtaskKey(JobID jobID, JobVertexID vertexID, TaskInfo taskInfo) {
230230
this(
231231
jobID.toHexString(),
232-
operatorID.toHexString(),
232+
vertexID.toHexString(),
233233
taskInfo.getIndexOfThisSubtask(),
234234
taskInfo.getNumberOfParallelSubtasks());
235235
}
236236

237237
@VisibleForTesting
238238
public SubtaskKey(
239-
String jobIDString, String operatorIDString, int subtaskIndex, int parallelism) {
239+
String jobIDString, String vertexIDString, int subtaskIndex, int parallelism) {
240240
this.jobIDString = jobIDString;
241-
this.operatorIDString = operatorIDString;
241+
this.vertexIDString = vertexIDString;
242242
this.subtaskIndex = subtaskIndex;
243243
this.parallelism = parallelism;
244244
int hash = jobIDString.hashCode();
245-
hash = 31 * hash + operatorIDString.hashCode();
245+
hash = 31 * hash + vertexIDString.hashCode();
246246
hash = 31 * hash + subtaskIndex;
247247
hash = 31 * hash + parallelism;
248248
this.hashCode = hash;
@@ -251,7 +251,7 @@ public SubtaskKey(
251251
public static SubtaskKey of(Environment environment) {
252252
return new SubtaskKey(
253253
environment.getJobID(),
254-
OperatorID.fromJobVertexID(environment.getJobVertexId()),
254+
environment.getJobVertexId(),
255255
environment.getTaskInfo());
256256
}
257257

@@ -269,7 +269,7 @@ public String getManagedDirName() {
269269
return String.format(
270270
MANAGED_DIR_FORMAT,
271271
jobIDString,
272-
operatorIDString,
272+
vertexIDString,
273273
subtaskIndex,
274274
parallelism)
275275
.replaceAll("[^a-zA-Z0-9\\-]", "_");
@@ -289,7 +289,7 @@ public boolean equals(Object o) {
289289
return hashCode == that.hashCode
290290
&& subtaskIndex == that.subtaskIndex
291291
&& parallelism == that.parallelism
292-
&& operatorIDString.equals(that.operatorIDString)
292+
&& vertexIDString.equals(that.vertexIDString)
293293
&& jobIDString.equals(that.jobIDString);
294294
}
295295

@@ -301,7 +301,7 @@ public int hashCode() {
301301
@Override
302302
public String toString() {
303303
return String.format(
304-
"%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism);
304+
"%s-%s(%d/%d)", jobIDString, vertexIDString, subtaskIndex, parallelism);
305305
}
306306
}
307307

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.TaskInfo;
2323
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
24-
import org.apache.flink.runtime.jobgraph.OperatorID;
24+
import org.apache.flink.runtime.jobgraph.JobVertexID;
2525
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
2626
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
2727
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -51,7 +51,7 @@ public class SubtaskFileMergingManagerRestoreOperation {
5151
private final TaskInfo taskInfo;
5252

5353
/** The id of the operator to which the subtask belongs. */
54-
private final OperatorID operatorID;
54+
private final JobVertexID vertexID;
5555

5656
private final FileMergingSnapshotManager fileMergingSnapshotManager;
5757

@@ -63,19 +63,19 @@ public SubtaskFileMergingManagerRestoreOperation(
6363
FileMergingSnapshotManager fileMergingSnapshotManager,
6464
JobID jobID,
6565
TaskInfo taskInfo,
66-
OperatorID operatorID,
66+
JobVertexID vertexID,
6767
OperatorSubtaskState subtaskState) {
6868
this.checkpointId = checkpointId;
6969
this.fileMergingSnapshotManager = fileMergingSnapshotManager;
7070
this.jobID = jobID;
7171
this.taskInfo = Preconditions.checkNotNull(taskInfo);
72-
this.operatorID = Preconditions.checkNotNull(operatorID);
72+
this.vertexID = Preconditions.checkNotNull(vertexID);
7373
this.subtaskState = Preconditions.checkNotNull(subtaskState);
7474
}
7575

7676
public void restore() {
7777
FileMergingSnapshotManager.SubtaskKey subtaskKey =
78-
new FileMergingSnapshotManager.SubtaskKey(jobID, operatorID, taskInfo);
78+
new FileMergingSnapshotManager.SubtaskKey(jobID, vertexID, taskInfo);
7979

8080
Stream<? extends StateObject> keyedStateHandles =
8181
Stream.concat(

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
3333
import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation;
3434
import org.apache.flink.runtime.execution.Environment;
35+
import org.apache.flink.runtime.jobgraph.JobVertexID;
3536
import org.apache.flink.runtime.jobgraph.OperatorID;
3637
import org.apache.flink.runtime.metrics.MetricNames;
3738
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
@@ -162,7 +163,8 @@ public StreamOperatorStateContext streamOperatorStateContext(
162163
throws Exception {
163164

164165
TaskInfo taskInfo = environment.getTaskInfo();
165-
registerRestoredStateToFileMergingManager(environment.getJobID(), taskInfo, operatorID);
166+
registerRestoredStateToFileMergingManager(
167+
environment.getJobID(), taskInfo, environment.getJobVertexId(), operatorID);
166168

167169
OperatorSubtaskDescriptionText operatorSubtaskDescription =
168170
new OperatorSubtaskDescriptionText(
@@ -361,7 +363,7 @@ public StreamOperatorStateContext streamOperatorStateContext(
361363
}
362364

363365
private void registerRestoredStateToFileMergingManager(
364-
JobID jobID, TaskInfo taskInfo, OperatorID operatorID) {
366+
JobID jobID, TaskInfo taskInfo, JobVertexID jobVertexID, OperatorID operatorID) {
365367
FileMergingSnapshotManager fileMergingSnapshotManager =
366368
taskStateManager.getFileMergingSnapshotManager();
367369
Optional<Long> restoredCheckpointId = taskStateManager.getRestoreCheckpointId();
@@ -377,7 +379,7 @@ private void registerRestoredStateToFileMergingManager(
377379
fileMergingSnapshotManager,
378380
jobID,
379381
taskInfo,
380-
operatorID,
382+
jobVertexID,
381383
subtaskState.get());
382384
restoreOperation.restore();
383385
}

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SpaceStat;
3232
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
3333
import org.apache.flink.runtime.clusterframework.types.ResourceID;
34+
import org.apache.flink.runtime.jobgraph.JobVertexID;
3435
import org.apache.flink.runtime.jobgraph.OperatorID;
3536
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
3637
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -73,7 +74,7 @@ public abstract class FileMergingSnapshotManagerTestBase {
7374

7475
final JobID jobID = new JobID();
7576

76-
final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
77+
final JobVertexID vertexID = new JobVertexID(289347923L, 75893479L);
7778

7879
SubtaskKey subtaskKey1;
7980
SubtaskKey subtaskKey2;
@@ -89,9 +90,9 @@ public abstract class FileMergingSnapshotManagerTestBase {
8990
@BeforeEach
9091
public void setup(@TempDir java.nio.file.Path tempFolder) {
9192
subtaskKey1 =
92-
new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3));
93+
new SubtaskKey(jobID, vertexID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3));
9394
subtaskKey2 =
94-
new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3));
95+
new SubtaskKey(jobID, vertexID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3));
9596

9697
checkpointBaseDir = new Path(tempFolder.toString(), jobID.toHexString());
9798
sharedStateDir = new Path(checkpointBaseDir, CHECKPOINT_SHARED_STATE_DIR);
@@ -492,12 +493,21 @@ public void testRestore() throws Exception {
492493
(FileMergingSnapshotManagerBase)
493494
createFileMergingSnapshotManager(checkpointBaseDir);
494495
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
495-
496+
fmsm.registerSubtaskForSharedStates(subtaskKey1);
496497
fmsm.notifyCheckpointStart(subtaskKey1, checkpointId);
497498

498499
Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID = new HashMap<>();
500+
// Here, we simulate a task with 2 operators, each operator has one keyed state and one
501+
// operator state. The second operator's id is the same as the vertexID.
502+
// first operator
503+
subtaskStatesByOperatorID.put(
504+
new OperatorID(777L, 75893479L),
505+
buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry));
506+
507+
// second operator
499508
subtaskStatesByOperatorID.put(
500-
operatorID, buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry));
509+
OperatorID.fromJobVertexID(vertexID),
510+
buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry));
501511
taskStateSnapshot = new TaskStateSnapshot(subtaskStatesByOperatorID);
502512
oldSpaceStat = fmsm.spaceStat;
503513

@@ -510,6 +520,7 @@ public void testRestore() throws Exception {
510520
try (FileMergingSnapshotManagerBase fmsm =
511521
(FileMergingSnapshotManagerBase)
512522
createFileMergingSnapshotManager(checkpointBaseDir)) {
523+
fmsm.registerSubtaskForSharedStates(subtaskKey1);
513524
TaskInfo taskInfo =
514525
new TaskInfoImpl(
515526
"test restore",
@@ -521,19 +532,15 @@ public void testRestore() throws Exception {
521532
taskStateSnapshot.getSubtaskStateMappings()) {
522533
SubtaskFileMergingManagerRestoreOperation restoreOperation =
523534
new SubtaskFileMergingManagerRestoreOperation(
524-
checkpointId,
525-
fmsm,
526-
jobID,
527-
taskInfo,
528-
entry.getKey(),
529-
entry.getValue());
535+
checkpointId, fmsm, jobID, taskInfo, vertexID, entry.getValue());
530536
restoreOperation.restore();
531537
}
532538
TreeMap<Long, Set<LogicalFile>> stateFiles = fmsm.getUploadedStates();
533539
assertThat(stateFiles.size()).isEqualTo(1);
534540
Set<LogicalFile> restoreFileSet = stateFiles.get(checkpointId);
535541
assertThat(restoreFileSet).isNotNull();
536-
assertThat(restoreFileSet.size()).isEqualTo(4);
542+
// 2 operators * (2 keyed state + 2 operator state)
543+
assertThat(restoreFileSet.size()).isEqualTo(8);
537544
assertThat(fmsm.spaceStat).isEqualTo(oldSpaceStat);
538545
for (LogicalFile file : restoreFileSet) {
539546
assertThat(fmsm.getLogicalFile(file.getFileId())).isEqualTo(file);
@@ -662,29 +669,44 @@ private OperatorSubtaskState buildOperatorSubtaskState(
662669
Collections.singletonList(
663670
IncrementalKeyedStateHandle.HandleAndLocalPath.of(
664671
buildOneSegmentFileHandle(
665-
checkpointId, fmsm, closeableRegistry),
672+
checkpointId,
673+
fmsm,
674+
CheckpointedStateScope.SHARED,
675+
closeableRegistry),
666676
"localPath")),
667677
Collections.emptyList(),
668678
null);
669679

670680
KeyGroupsStateHandle keyedStateHandle2 =
671681
new KeyGroupsStateHandle(
672682
new KeyGroupRangeOffsets(0, 8),
673-
buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
683+
buildOneSegmentFileHandle(
684+
checkpointId,
685+
fmsm,
686+
CheckpointedStateScope.EXCLUSIVE,
687+
closeableRegistry));
674688

675689
OperatorStateHandle operatorStateHandle1 =
676690
new FileMergingOperatorStreamStateHandle(
677691
null,
678692
null,
679693
Collections.emptyMap(),
680-
buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
694+
buildOneSegmentFileHandle(
695+
checkpointId,
696+
fmsm,
697+
CheckpointedStateScope.EXCLUSIVE,
698+
closeableRegistry));
681699

682700
OperatorStateHandle operatorStateHandle2 =
683701
new FileMergingOperatorStreamStateHandle(
684702
null,
685703
null,
686704
Collections.emptyMap(),
687-
buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry));
705+
buildOneSegmentFileHandle(
706+
checkpointId,
707+
fmsm,
708+
CheckpointedStateScope.EXCLUSIVE,
709+
closeableRegistry));
688710

689711
return OperatorSubtaskState.builder()
690712
.setManagedKeyedState(keyedStateHandle1)
@@ -695,10 +717,13 @@ private OperatorSubtaskState buildOperatorSubtaskState(
695717
}
696718

697719
private SegmentFileStateHandle buildOneSegmentFileHandle(
698-
long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
720+
long checkpointId,
721+
FileMergingSnapshotManager fmsm,
722+
CheckpointedStateScope scope,
723+
CloseableRegistry closeableRegistry)
699724
throws Exception {
700725
FileMergingCheckpointStateOutputStream outputStream =
701-
writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry);
726+
writeCheckpointAndGetStream(checkpointId, fmsm, scope, closeableRegistry);
702727
return outputStream.closeAndGetHandle();
703728
}
704729

@@ -741,15 +766,13 @@ jobID, new ResourceID(tmId), getFileMergingType())
741766
}
742767

743768
FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
744-
long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
769+
long checkpointId,
770+
FileMergingSnapshotManager fmsm,
771+
CheckpointedStateScope scope,
772+
CloseableRegistry closeableRegistry)
745773
throws IOException {
746774
return writeCheckpointAndGetStream(
747-
subtaskKey1,
748-
checkpointId,
749-
CheckpointedStateScope.EXCLUSIVE,
750-
fmsm,
751-
closeableRegistry,
752-
32);
775+
subtaskKey1, checkpointId, scope, fmsm, closeableRegistry, 32);
753776
}
754777

755778
FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(

0 commit comments

Comments
 (0)