1717
1818package org .apache .fluss .flink .source .state ;
1919
20+ import org .apache .flink .core .io .SimpleVersionedSerializer ;
21+ import org .apache .flink .core .memory .DataInputDeserializer ;
22+ import org .apache .flink .core .memory .DataOutputSerializer ;
23+ import org .apache .fluss .annotation .VisibleForTesting ;
2024import org .apache .fluss .flink .source .split .SourceSplitBase ;
2125import org .apache .fluss .flink .source .split .SourceSplitSerializer ;
2226import org .apache .fluss .lake .source .LakeSource ;
2327import org .apache .fluss .lake .source .LakeSplit ;
2428import org .apache .fluss .metadata .TableBucket ;
2529
26- import org .apache .flink .core .io .SimpleVersionedSerializer ;
27- import org .apache .flink .core .memory .DataInputDeserializer ;
28- import org .apache .flink .core .memory .DataOutputSerializer ;
29-
3030import javax .annotation .Nullable ;
31-
3231import java .io .IOException ;
3332import java .util .ArrayList ;
3433import java .util .HashMap ;
3736import java .util .Map ;
3837import java .util .Set ;
3938
39+ import static org .apache .fluss .utils .Preconditions .checkNotNull ;
40+
4041/** A serializer for {@link SourceEnumeratorState}. */
4142public class FlussSourceEnumeratorStateSerializer
4243 implements SimpleVersionedSerializer <SourceEnumeratorState > {
4344
4445 @ Nullable private final LakeSource <LakeSplit > lakeSource ;
4546
4647 private static final int VERSION_0 = 0 ;
48+ private static final int VERSION_1 = 1 ;
49+
4750 private static final ThreadLocal <DataOutputSerializer > SERIALIZER_CACHE =
4851 ThreadLocal .withInitial (() -> new DataOutputSerializer (64 ));
4952
50- private static final int CURRENT_VERSION = VERSION_0 ;
53+ private static final int CURRENT_VERSION = VERSION_1 ;
5154
5255 public FlussSourceEnumeratorStateSerializer (LakeSource <LakeSplit > lakeSource ) {
5356 this .lakeSource = lakeSource ;
@@ -61,11 +64,28 @@ public int getVersion() {
6164 @ Override
6265 public byte [] serialize (SourceEnumeratorState state ) throws IOException {
6366 final DataOutputSerializer out = SERIALIZER_CACHE .get ();
67+
68+ // serialize assign bucket and partitions
69+ serializeAssignBucketAndPartitions (
70+ out , state .getAssignedBuckets (), state .getAssignedPartitions ());
71+
72+ // serialize remain hybrid lake splits
73+ serializeRemainingHybridLakeFlussSplits (out , state );
74+
75+ final byte [] result = out .getCopyOfBuffer ();
76+ out .clear ();
77+ return result ;
78+ }
79+
80+ private void serializeAssignBucketAndPartitions (
81+ DataOutputSerializer out ,
82+ Set <TableBucket > assignedBuckets ,
83+ Map <Long , String > assignedPartitions )
84+ throws IOException {
6485 // write assigned buckets
65- out .writeInt (state . getAssignedBuckets () .size ());
66- for (TableBucket tableBucket : state . getAssignedBuckets () ) {
86+ out .writeInt (assignedBuckets .size ());
87+ for (TableBucket tableBucket : assignedBuckets ) {
6788 out .writeLong (tableBucket .getTableId ());
68-
6989 // write partition
7090 // if partition is not null
7191 if (tableBucket .getPartitionId () != null ) {
@@ -78,24 +98,29 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
7898 out .writeInt (tableBucket .getBucket ());
7999 }
80100 // write assigned partitions
81- out .writeInt (state . getAssignedPartitions () .size ());
82- for (Map .Entry <Long , String > entry : state . getAssignedPartitions () .entrySet ()) {
101+ out .writeInt (assignedPartitions .size ());
102+ for (Map .Entry <Long , String > entry : assignedPartitions .entrySet ()) {
83103 out .writeLong (entry .getKey ());
84104 out .writeUTF (entry .getValue ());
85105 }
106+ }
86107
108+ @ VisibleForTesting
109+ protected byte [] serializeV0 (SourceEnumeratorState state ) throws IOException {
110+ final DataOutputSerializer out = SERIALIZER_CACHE .get ();
111+ serializeAssignBucketAndPartitions (
112+ out , state .getAssignedBuckets (), state .getAssignedPartitions ());
87113 if (lakeSource != null ) {
88114 serializeRemainingHybridLakeFlussSplits (out , state );
89115 }
90-
91116 final byte [] result = out .getCopyOfBuffer ();
92117 out .clear ();
93118 return result ;
94119 }
95120
96121 @ Override
97122 public SourceEnumeratorState deserialize (int version , byte [] serialized ) throws IOException {
98- if (version != VERSION_0 ) {
123+ if (version != VERSION_0 && version != CURRENT_VERSION ) {
99124 throw new IOException ("Unknown version or corrupt state: " + version );
100125 }
101126 final DataInputDeserializer in = new DataInputDeserializer (serialized );
@@ -124,8 +149,18 @@ public SourceEnumeratorState deserialize(int version, byte[] serialized) throws
124149 }
125150
126151 List <SourceSplitBase > remainingHybridLakeFlussSplits = null ;
127- if (lakeSource != null ) {
128- // todo: add a ut for serialize remaining hybrid lake fluss splits
152+
153+ if (version == VERSION_0 ) {
154+ // For VERSION_0, deserialize remaining hybrid lake Fluss splits only when lakeSource is
155+ // not null.
156+ if (lakeSource != null ) {
157+ remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits (in );
158+ }
159+ } else {
160+ // For VERSION_1 and later, always attempt to deserialize remaining hybrid lake/Fluss
161+ // splits. The serialized state encodes their presence via a boolean flag, so this
162+ // logic no longer depends on the lakeSource flag. This unconditional deserialization
163+ // is the intended behavior change compared to VERSION_0.
129164 remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits (in );
130165 }
131166
@@ -160,7 +195,11 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
160195 if (in .readBoolean ()) {
161196 int numSplits = in .readInt ();
162197 List <SourceSplitBase > splits = new ArrayList <>(numSplits );
163- SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer (lakeSource );
198+ SourceSplitSerializer sourceSplitSerializer =
199+ new SourceSplitSerializer (
200+ checkNotNull (
201+ lakeSource ,
202+ "lake source must not be null when there are hybrid lake splits." ));
164203 int version = in .readInt ();
165204 for (int i = 0 ; i < numSplits ; i ++) {
166205 int splitSizeInBytes = in .readInt ();
0 commit comments