Skip to content

Commit c446f5a

Browse files
zoltar9264Zakelly
authored andcommitted
[FLINK-33734][checkpointing] Introduce MetadataV6Serializer for merged channel state handle
1 parent 88c75d5 commit c446f5a

File tree

3 files changed

+197
-1
lines changed

3 files changed

+197
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@
3030
public class MetadataSerializers {
3131

3232
private static final Map<Integer, MetadataSerializer> SERIALIZERS =
33-
CollectionUtil.newHashMapWithExpectedSize(5);
33+
CollectionUtil.newHashMapWithExpectedSize(6);
3434

3535
static {
3636
registerSerializer(MetadataV1Serializer.INSTANCE);
3737
registerSerializer(MetadataV2Serializer.INSTANCE);
3838
registerSerializer(MetadataV3Serializer.INSTANCE);
3939
registerSerializer(MetadataV4Serializer.INSTANCE);
4040
registerSerializer(MetadataV5Serializer.INSTANCE);
41+
registerSerializer(MetadataV6Serializer.INSTANCE);
4142
}
4243

4344
private static void registerSerializer(MetadataSerializer serializer) {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.checkpoint.metadata;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** V6 serializer that use ChannelStateHandleSerializerV2. */
24+
@Internal
25+
public class MetadataV6Serializer extends MetadataV5Serializer {
26+
27+
public static final MetadataSerializer INSTANCE = new MetadataV6Serializer();
28+
29+
public static final int VERSION = 6;
30+
31+
private final ChannelStateHandleSerializer channelStateHandleSerializer =
32+
new ChannelStateHandleSerializerV2();
33+
34+
@Override
35+
public int getVersion() {
36+
return VERSION;
37+
}
38+
39+
@Override
40+
protected ChannelStateHandleSerializer getChannelStateHandleSerializer() {
41+
return channelStateHandleSerializer;
42+
}
43+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package org.apache.flink.runtime.checkpoint.metadata;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import org.apache.flink.core.fs.FileSystem;
21+
import org.apache.flink.runtime.checkpoint.OperatorState;
22+
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
23+
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
24+
import org.apache.flink.runtime.state.InputStateHandle;
25+
import org.apache.flink.runtime.state.OutputStateHandle;
26+
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
27+
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.io.TempDir;
31+
32+
import java.io.ByteArrayInputStream;
33+
import java.io.ByteArrayOutputStream;
34+
import java.io.DataInputStream;
35+
import java.io.DataOutputStream;
36+
import java.io.IOException;
37+
import java.nio.file.Path;
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
import java.util.List;
41+
import java.util.Random;
42+
import java.util.function.Supplier;
43+
import java.util.stream.Collectors;
44+
45+
import static java.util.Collections.emptyList;
46+
import static org.apache.flink.runtime.checkpoint.metadata.ChannelStateTestUtils.randomMergedInputChannelStateHandle;
47+
import static org.apache.flink.runtime.checkpoint.metadata.ChannelStateTestUtils.randomMergedResultSubpartitionStateHandle;
48+
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.junit.jupiter.api.Assertions.assertEquals;
50+
51+
/** {@link MetadataV6Serializer} test. */
52+
class MetadataV6SerializerTest {
53+
54+
private static final MetadataSerializer INSTANCE = MetadataV6Serializer.INSTANCE;
55+
56+
private static final Random RND = new Random();
57+
58+
private String basePath;
59+
60+
private List<InputStateHandle> inputHandles;
61+
private List<OutputStateHandle> outputHandles;
62+
63+
private CheckpointMetadata metadata;
64+
65+
@BeforeEach
66+
public void beforeEach(@TempDir Path tempDir) throws IOException {
67+
basePath = tempDir.toUri().toString();
68+
69+
final org.apache.flink.core.fs.Path metaPath =
70+
new org.apache.flink.core.fs.Path(
71+
basePath, AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME);
72+
// this is in the temp folder, so it will get automatically deleted
73+
FileSystem.getLocalFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE).close();
74+
}
75+
76+
@Test
77+
void testSerializeUnmergedChannelStateHandle() throws IOException {
78+
testSerializeChannelStateHandle(
79+
() ->
80+
ChannelStateTestUtils.randomInputChannelStateHandlesFromSameSubtask()
81+
.stream()
82+
.map(e -> (InputStateHandle) e)
83+
.collect(Collectors.toList()),
84+
() ->
85+
ChannelStateTestUtils.randomResultSubpartitionStateHandlesFromSameSubtask()
86+
.stream()
87+
.map(e -> (OutputStateHandle) e)
88+
.collect(Collectors.toList()));
89+
}
90+
91+
@Test
92+
void testSerializeMergedChannelStateHandle() throws IOException {
93+
testSerializeChannelStateHandle(
94+
() -> Collections.singletonList(randomMergedInputChannelStateHandle()),
95+
() -> Collections.singletonList(randomMergedResultSubpartitionStateHandle()));
96+
}
97+
98+
private void testSerializeChannelStateHandle(
99+
Supplier<List<InputStateHandle>> getter1, Supplier<List<OutputStateHandle>> getter2)
100+
throws IOException {
101+
102+
prepareAndSerializeMetadata(getter1, getter2);
103+
104+
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
105+
DataOutputStream dos = new DataOutputStream(out)) {
106+
INSTANCE.serialize(metadata, dos);
107+
108+
try (DataInputStream dis =
109+
new DataInputStream(new ByteArrayInputStream(out.toByteArray()))) {
110+
111+
CheckpointMetadata deserializedMetadata =
112+
INSTANCE.deserialize(dis, metadata.getClass().getClassLoader(), basePath);
113+
114+
Collection<OperatorState> operatorStates = deserializedMetadata.getOperatorStates();
115+
assertThat(operatorStates).hasSize(1);
116+
117+
OperatorState operatorState = operatorStates.iterator().next();
118+
assertEquals(1, operatorState.getNumberCollectedStates());
119+
120+
OperatorSubtaskState subtaskState = operatorState.getState(0);
121+
122+
assertEquals(inputHandles, subtaskState.getInputChannelState().asList());
123+
assertEquals(outputHandles, subtaskState.getResultSubpartitionState().asList());
124+
}
125+
}
126+
}
127+
128+
private void prepareAndSerializeMetadata(
129+
Supplier<List<InputStateHandle>> getter1, Supplier<List<OutputStateHandle>> getter2) {
130+
Collection<OperatorState> operatorStates =
131+
CheckpointTestUtils.createOperatorStates(RND, basePath, 1, 0, 0, 1);
132+
133+
inputHandles = getter1.get();
134+
outputHandles = getter2.get();
135+
136+
// Set merged channel state handle to each subtask state
137+
for (OperatorState operatorState : operatorStates) {
138+
int subtaskStateCount = operatorState.getNumberCollectedStates();
139+
for (int i = 0; i < subtaskStateCount; i++) {
140+
OperatorSubtaskState originSubtaskState = operatorState.getState(i);
141+
142+
OperatorSubtaskState.Builder builder = originSubtaskState.toBuilder();
143+
builder.setInputChannelState(new StateObjectCollection<>(inputHandles));
144+
builder.setResultSubpartitionState(new StateObjectCollection<>(outputHandles));
145+
146+
operatorState.putState(i, builder.build());
147+
}
148+
}
149+
150+
metadata = new CheckpointMetadata(1L, operatorStates, emptyList(), null);
151+
}
152+
}

0 commit comments

Comments
 (0)