Skip to content

Commit d0fc306

Browse files
committed
address comment v1
1 parent ccacaaf commit d0fc306

File tree

1 file changed

+29
-3
lines changed

1 file changed

+29
-3
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,26 @@
4040

4141
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4242

43-
/** A serializer for {@link SourceEnumeratorState}. */
43+
/**
44+
* Serializer for {@link SourceEnumeratorState}.
45+
*
46+
* <p>This serializer manages the versioned persistence of the enumerator's state, including
47+
* assigned buckets, partitions, and remaining hybrid lake/Fluss splits.
48+
*
49+
* <h3>Version Evolution:</h3>
50+
*
51+
* <ul>
52+
* <li><b>Version 0:</b> Initial version. Remaining hybrid lake splits are only (de)serialized if
53+
* the {@code lakeSource} is non-null.
54+
* <li><b>Version 1 (Current):</b> Decouples split serialization from the {@code lakeSource}
55+
* presence. It always attempts to (de)serialize the splits, using an internal boolean flag to
56+
* indicate presence. This ensures state consistency regardless of the current runtime
57+
* configuration.
58+
* </ul>
59+
*
60+
* <p><b>Compatibility Note:</b> This serializer is designed for backward compatibility. It can
61+
* deserialize states from Version 0, but always produces Version 1 during serialization.
62+
*/
4463
public class FlussSourceEnumeratorStateSerializer
4564
implements SimpleVersionedSerializer<SourceEnumeratorState> {
4665

@@ -122,8 +141,15 @@ protected byte[] serializeV0(SourceEnumeratorState state) throws IOException {
122141

123142
@Override
124143
public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
125-
if (version != VERSION_0 && version != CURRENT_VERSION) {
126-
throw new IOException("Unknown version or corrupt state: " + version);
144+
if (version < VERSION_0 || version > CURRENT_VERSION) {
145+
throw new IOException(
146+
"Unsupported state version: "
147+
+ version
148+
+ ". Supported range: ["
149+
+ VERSION_0
150+
+ ", "
151+
+ CURRENT_VERSION
152+
+ "]");
127153
}
128154
final DataInputDeserializer in = new DataInputDeserializer(serialized);
129155
// deserialize assigned buckets

0 commit comments

Comments
 (0)