Skip to content

Commit 0d1b5aa

Browse files
zoltar9264Zakelly
authored andcommitted
[FLINK-33734][checkpointing] Compact and serialize original channel state handle before send checkpoint ack to JobManager
1 parent c446f5a commit 0d1b5aa

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
2525
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
2626
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
27-
import org.apache.flink.runtime.checkpoint.metadata.MetadataV5Serializer;
27+
import org.apache.flink.runtime.checkpoint.metadata.MetadataV6Serializer;
2828
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
2929
import org.apache.flink.runtime.jobgraph.JobVertexID;
3030
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -85,7 +85,7 @@ public static void storeCheckpointMetadata(
8585

8686
public static void storeCheckpointMetadata(
8787
CheckpointMetadata checkpointMetadata, DataOutputStream out) throws IOException {
88-
storeCheckpointMetadata(checkpointMetadata, out, MetadataV5Serializer.INSTANCE);
88+
storeCheckpointMetadata(checkpointMetadata, out, MetadataV6Serializer.INSTANCE);
8989
}
9090

9191
public static void storeCheckpointMetadata(

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.runtime.checkpoint.CheckpointException;
2828
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
2929
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
30-
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
3130
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
3231
import org.apache.flink.runtime.event.AbstractEvent;
3332
import org.apache.flink.runtime.execution.Environment;
@@ -44,6 +43,7 @@
4443
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
4544
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
4645
import org.apache.flink.runtime.plugable.SerializationDelegate;
46+
import org.apache.flink.runtime.state.ChannelStateHelper;
4747
import org.apache.flink.runtime.state.CheckpointStreamFactory;
4848
import org.apache.flink.runtime.state.SnapshotResult;
4949
import org.apache.flink.streaming.api.graph.NonChainedOutput;
@@ -469,14 +469,14 @@ protected void snapshotChannelStates(
469469
snapshotInProgress.setInputChannelStateFuture(
470470
channelStateWriteResult
471471
.getInputChannelStateHandles()
472-
.thenApply(e -> new StateObjectCollection(e))
472+
.thenApply(ChannelStateHelper::mergeInputStateCollection)
473473
.thenApply(SnapshotResult::of));
474474
}
475475
if (op == getTailOperator()) {
476476
snapshotInProgress.setResultSubpartitionStateFuture(
477477
channelStateWriteResult
478478
.getResultSubpartitionStateHandles()
479-
.thenApply(e -> new StateObjectCollection(e))
479+
.thenApply(ChannelStateHelper::mergeOutputStateCollection)
480480
.thenApply(SnapshotResult::of));
481481
}
482482
}

0 commit comments

Comments
 (0)