1717
1818package org .apache .fluss .flink .source .state ;
1919
20+ import org .apache .fluss .annotation .VisibleForTesting ;
2021import org .apache .fluss .flink .source .split .SourceSplitBase ;
2122import org .apache .fluss .flink .source .split .SourceSplitSerializer ;
2223import org .apache .fluss .lake .source .LakeSource ;
3738import java .util .Map ;
3839import java .util .Set ;
3940
41+ import static org .apache .fluss .utils .Preconditions .checkNotNull ;
42+
4043/** A serializer for {@link SourceEnumeratorState}. */
4144public class FlussSourceEnumeratorStateSerializer
4245 implements SimpleVersionedSerializer <SourceEnumeratorState > {
4346
4447 @ Nullable private final LakeSource <LakeSplit > lakeSource ;
4548
4649 private static final int VERSION_0 = 0 ;
50+ private static final int VERSION_1 = 1 ;
51+
4752 private static final ThreadLocal <DataOutputSerializer > SERIALIZER_CACHE =
4853 ThreadLocal .withInitial (() -> new DataOutputSerializer (64 ));
4954
50- private static final int CURRENT_VERSION = VERSION_0 ;
55+ private static final int CURRENT_VERSION = VERSION_1 ;
5156
5257 public FlussSourceEnumeratorStateSerializer (LakeSource <LakeSplit > lakeSource ) {
5358 this .lakeSource = lakeSource ;
@@ -61,11 +66,28 @@ public int getVersion() {
6166 @ Override
6267 public byte [] serialize (SourceEnumeratorState state ) throws IOException {
6368 final DataOutputSerializer out = SERIALIZER_CACHE .get ();
69+
70+ // serialize assign bucket and partitions
71+ serializeAssignBucketAndPartitions (
72+ out , state .getAssignedBuckets (), state .getAssignedPartitions ());
73+
74+ // serialize remain hybrid lake splits
75+ serializeRemainingHybridLakeFlussSplits (out , state );
76+
77+ final byte [] result = out .getCopyOfBuffer ();
78+ out .clear ();
79+ return result ;
80+ }
81+
82+ private void serializeAssignBucketAndPartitions (
83+ DataOutputSerializer out ,
84+ Set <TableBucket > assignedBuckets ,
85+ Map <Long , String > assignedPartitions )
86+ throws IOException {
6487 // write assigned buckets
65- out .writeInt (state . getAssignedBuckets () .size ());
66- for (TableBucket tableBucket : state . getAssignedBuckets () ) {
88+ out .writeInt (assignedBuckets .size ());
89+ for (TableBucket tableBucket : assignedBuckets ) {
6790 out .writeLong (tableBucket .getTableId ());
68-
6991 // write partition
7092 // if partition is not null
7193 if (tableBucket .getPartitionId () != null ) {
@@ -78,24 +100,29 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
78100 out .writeInt (tableBucket .getBucket ());
79101 }
80102 // write assigned partitions
81- out .writeInt (state . getAssignedPartitions () .size ());
82- for (Map .Entry <Long , String > entry : state . getAssignedPartitions () .entrySet ()) {
103+ out .writeInt (assignedPartitions .size ());
104+ for (Map .Entry <Long , String > entry : assignedPartitions .entrySet ()) {
83105 out .writeLong (entry .getKey ());
84106 out .writeUTF (entry .getValue ());
85107 }
108+ }
86109
110+ @ VisibleForTesting
111+ protected byte [] serializeV0 (SourceEnumeratorState state ) throws IOException {
112+ final DataOutputSerializer out = SERIALIZER_CACHE .get ();
113+ serializeAssignBucketAndPartitions (
114+ out , state .getAssignedBuckets (), state .getAssignedPartitions ());
87115 if (lakeSource != null ) {
88116 serializeRemainingHybridLakeFlussSplits (out , state );
89117 }
90-
91118 final byte [] result = out .getCopyOfBuffer ();
92119 out .clear ();
93120 return result ;
94121 }
95122
96123 @ Override
97124 public SourceEnumeratorState deserialize (int version , byte [] serialized ) throws IOException {
98- if (version != VERSION_0 ) {
125+ if (version != VERSION_0 && version != CURRENT_VERSION ) {
99126 throw new IOException ("Unknown version or corrupt state: " + version );
100127 }
101128 final DataInputDeserializer in = new DataInputDeserializer (serialized );
@@ -124,8 +151,14 @@ public SourceEnumeratorState deserialize(int version, byte[] serialized) throws
124151 }
125152
126153 List <SourceSplitBase > remainingHybridLakeFlussSplits = null ;
127- if (lakeSource != null ) {
128- // todo: add a ut for serialize remaining hybrid lake fluss splits
154+
155+ if (version == VERSION_0 ) {
156+ // if it's version0, which rely on lakeSource flag to do deserialize,
157+ if (lakeSource != null ) {
158+ remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits (in );
159+ }
160+ } else {
161+ // anyway, deserialize remaining splits
129162 remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits (in );
130163 }
131164
@@ -160,7 +193,11 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
160193 if (in .readBoolean ()) {
161194 int numSplits = in .readInt ();
162195 List <SourceSplitBase > splits = new ArrayList <>(numSplits );
163- SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer (lakeSource );
196+ SourceSplitSerializer sourceSplitSerializer =
197+ new SourceSplitSerializer (
198+ checkNotNull (
199+ lakeSource ,
200+ "lake source must not be null when there are hybrid lake splits." ));
164201 int version = in .readInt ();
165202 for (int i = 0 ; i < numSplits ; i ++) {
166203 int splitSizeInBytes = in .readInt ();
0 commit comments