diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index c355109a2d..b721032d0b 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -17,11 +17,13 @@
package org.apache.fluss.flink.source.state;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,17 +39,40 @@
import java.util.Map;
import java.util.Set;
-/** A serializer for {@link SourceEnumeratorState}. */
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Serializer for {@link SourceEnumeratorState}.
+ *
+ *
This serializer manages the versioned persistence of the enumerator's state, including
+ * assigned buckets, partitions, and remaining hybrid lake/Fluss splits.
+ *
+ *
Version Evolution:
+ *
+ *
+ * - Version 0: Initial version. Remaining hybrid lake splits are only (de)serialized if
+ * the {@code lakeSource} is non-null.
+ *
- Version 1 (Current): Decouples split serialization from the {@code lakeSource}
+ * presence. It always attempts to (de)serialize the splits, using an internal boolean flag to
+ * indicate presence. This ensures state consistency regardless of the current runtime
+ * configuration.
+ *
+ *
+ * Compatibility Note: This serializer is designed for backward compatibility. It can
+ * deserialize states from Version 0, but always produces Version 1 during serialization.
+ */
public class FlussSourceEnumeratorStateSerializer
implements SimpleVersionedSerializer {
@Nullable private final LakeSource lakeSource;
private static final int VERSION_0 = 0;
+ private static final int VERSION_1 = 1;
+
private static final ThreadLocal SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
- private static final int CURRENT_VERSION = VERSION_0;
+ private static final int CURRENT_VERSION = VERSION_1;
public FlussSourceEnumeratorStateSerializer(LakeSource lakeSource) {
this.lakeSource = lakeSource;
@@ -61,11 +86,28 @@ public int getVersion() {
@Override
public byte[] serialize(SourceEnumeratorState state) throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+ // serialize assign bucket and partitions
+ serializeAssignBucketAndPartitions(
+ out, state.getAssignedBuckets(), state.getAssignedPartitions());
+
+ // serialize remain hybrid lake splits
+ serializeRemainingHybridLakeFlussSplits(out, state);
+
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ private void serializeAssignBucketAndPartitions(
+ DataOutputSerializer out,
+ Set assignedBuckets,
+ Map assignedPartitions)
+ throws IOException {
// write assigned buckets
- out.writeInt(state.getAssignedBuckets().size());
- for (TableBucket tableBucket : state.getAssignedBuckets()) {
+ out.writeInt(assignedBuckets.size());
+ for (TableBucket tableBucket : assignedBuckets) {
out.writeLong(tableBucket.getTableId());
-
// write partition
// if partition is not null
if (tableBucket.getPartitionId() != null) {
@@ -78,16 +120,42 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
out.writeInt(tableBucket.getBucket());
}
// write assigned partitions
- out.writeInt(state.getAssignedPartitions().size());
- for (Map.Entry entry : state.getAssignedPartitions().entrySet()) {
+ out.writeInt(assignedPartitions.size());
+ for (Map.Entry entry : assignedPartitions.entrySet()) {
out.writeLong(entry.getKey());
out.writeUTF(entry.getValue());
}
+ }
+ private void serializeRemainingHybridLakeFlussSplits(
+ final DataOutputSerializer out, SourceEnumeratorState state) throws IOException {
+ List remainingHybridLakeFlussSplits =
+ state.getRemainingHybridLakeFlussSplits();
+ if (remainingHybridLakeFlussSplits != null) {
+ // write that hybrid lake fluss splits is not null
+ out.writeBoolean(true);
+ out.writeInt(remainingHybridLakeFlussSplits.size());
+ SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
+ out.writeInt(sourceSplitSerializer.getVersion());
+ for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
+ byte[] serializeBytes = sourceSplitSerializer.serialize(split);
+ out.writeInt(serializeBytes.length);
+ out.write(serializeBytes);
+ }
+ } else {
+ // write that hybrid lake fluss splits is null
+ out.writeBoolean(false);
+ }
+ }
+
+ @VisibleForTesting
+ protected byte[] serializeV0(SourceEnumeratorState state) throws IOException {
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ serializeAssignBucketAndPartitions(
+ out, state.getAssignedBuckets(), state.getAssignedPartitions());
if (lakeSource != null) {
serializeRemainingHybridLakeFlussSplits(out, state);
}
-
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
@@ -95,10 +163,54 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
@Override
public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
- if (version != VERSION_0) {
- throw new IOException("Unknown version or corrupt state: " + version);
+ switch (version) {
+ case VERSION_1:
+ return deserializeV1(serialized);
+ case VERSION_0:
+ return deserializeV0(serialized);
+ default:
+ throw new IOException(
+ String.format(
+ "The bytes are serialized with version %d, "
+ + "while this deserializer only supports version up to %d",
+ version, CURRENT_VERSION));
+ }
+ }
+
+ private SourceEnumeratorState deserializeV0(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ Tuple2, Map> assignBucketAndPartitions =
+ deserializeAssignBucketAndPartitions(in);
+ List remainingHybridLakeFlussSplits = null;
+ // in version 0, deserialize remaining hybrid lake Fluss splits only when lakeSource is
+ // not null.
+ if (lakeSource != null) {
+ remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits(in);
}
- final DataInputDeserializer in = new DataInputDeserializer(serialized);
+ return new SourceEnumeratorState(
+ assignBucketAndPartitions.f0,
+ assignBucketAndPartitions.f1,
+ remainingHybridLakeFlussSplits);
+ }
+
+ private SourceEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ Tuple2, Map> assignBucketAndPartitions =
+ deserializeAssignBucketAndPartitions(in);
+ List remainingHybridLakeFlussSplits =
+ deserializeRemainingHybridLakeFlussSplits(in);
+ // in version 1, always attempt to deserialize remaining hybrid lake/Fluss
+ // splits. The serialized state encodes their presence via a boolean flag, so
+ // this logic no longer depends on the lakeSource flag. This unconditional
+ // deserialization is the intended behavior change compared to VERSION_0.
+ return new SourceEnumeratorState(
+ assignBucketAndPartitions.f0,
+ assignBucketAndPartitions.f1,
+ remainingHybridLakeFlussSplits);
+ }
+
+ private Tuple2, Map> deserializeAssignBucketAndPartitions(
+ DataInputDeserializer in) throws IOException {
// deserialize assigned buckets
int assignedBucketsSize = in.readInt();
Set assignedBuckets = new HashSet<>(assignedBucketsSize);
@@ -122,36 +234,7 @@ public SourceEnumeratorState deserialize(int version, byte[] serialized) throws
String partition = in.readUTF();
assignedPartitions.put(partitionId, partition);
}
-
- List remainingHybridLakeFlussSplits = null;
- if (lakeSource != null) {
- // todo: add a ut for serialize remaining hybrid lake fluss splits
- remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits(in);
- }
-
- return new SourceEnumeratorState(
- assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
- }
-
- private void serializeRemainingHybridLakeFlussSplits(
- final DataOutputSerializer out, SourceEnumeratorState state) throws IOException {
- List remainingHybridLakeFlussSplits =
- state.getRemainingHybridLakeFlussSplits();
- if (remainingHybridLakeFlussSplits != null) {
- // write that hybrid lake fluss splits is not null
- out.writeBoolean(true);
- out.writeInt(remainingHybridLakeFlussSplits.size());
- SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
- out.writeInt(sourceSplitSerializer.getVersion());
- for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
- byte[] serializeBytes = sourceSplitSerializer.serialize(split);
- out.writeInt(serializeBytes.length);
- out.write(serializeBytes);
- }
- } else {
- // write that hybrid lake fluss splits is null
- out.writeBoolean(false);
- }
+ return Tuple2.of(assignedBuckets, assignedPartitions);
}
@Nullable
@@ -160,7 +243,11 @@ private List deserializeRemainingHybridLakeFlussSplits(
if (in.readBoolean()) {
int numSplits = in.readInt();
List splits = new ArrayList<>(numSplits);
- SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
+ SourceSplitSerializer sourceSplitSerializer =
+ new SourceSplitSerializer(
+ checkNotNull(
+ lakeSource,
+ "lake source must not be null when there are hybrid lake splits."));
int version = in.readInt();
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index 7a677df20f..6042e65f15 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -73,12 +73,14 @@ public boolean equals(Object o) {
}
SourceEnumeratorState that = (SourceEnumeratorState) o;
return Objects.equals(assignedBuckets, that.assignedBuckets)
- && Objects.equals(assignedPartitions, that.assignedPartitions);
+ && Objects.equals(assignedPartitions, that.assignedPartitions)
+ && Objects.equals(
+ remainingHybridLakeFlussSplits, that.remainingHybridLakeFlussSplits);
}
@Override
public int hashCode() {
- return Objects.hash(assignedBuckets, assignedPartitions);
+ return Objects.hash(assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
}
@Override
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index 26b4a024c0..2c30bf086c 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -92,4 +92,66 @@ void testPendingSplitsCheckpointSerde() throws Exception {
/* check deserialized is equal to the original */
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}
+
+ @Test
+ void testV0Compatibility() throws Exception {
+ // serialize with v0,
+ int version = 0;
+ // test with lake source = null
+ FlussSourceEnumeratorStateSerializer serializer =
+ new FlussSourceEnumeratorStateSerializer(null);
+
+ Set assignedBuckets =
+ new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 4L, 1)));
+ Map assignedPartitions = new HashMap<>();
+ assignedPartitions.put(1L, "partition1");
+ assignedPartitions.put(2L, "partition2");
+ SourceEnumeratorState sourceEnumeratorState =
+ new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
+ byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
+
+ // then deserialize
+ SourceEnumeratorState deserializedSourceEnumeratorState =
+ serializer.deserialize(version, serialized);
+ assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+
+ // test with lake source is not null
+ serializer = new FlussSourceEnumeratorStateSerializer(new TestingLakeSource());
+ List remainingHybridLakeFlussSplits = new ArrayList<>();
+ // Add a LogSplit
+ TableBucket logSplitBucket = new TableBucket(1, 0);
+ LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+ remainingHybridLakeFlussSplits.add(logSplit);
+ sourceEnumeratorState =
+ new SourceEnumeratorState(
+ assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
+
+ serialized = serializer.serializeV0(sourceEnumeratorState);
+
+ // then deserialize
+ deserializedSourceEnumeratorState = serializer.deserialize(version, serialized);
+ assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+ }
+
+ @Test
+ void testInconsistentLakeSourceSerde() throws Exception {
+ // test serialize with null lake source
+ FlussSourceEnumeratorStateSerializer serializer =
+ new FlussSourceEnumeratorStateSerializer(null);
+
+ Set assignedBuckets =
+ new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 4L, 1)));
+ Map assignedPartitions = new HashMap<>();
+ assignedPartitions.put(1L, "partition1");
+ assignedPartitions.put(2L, "partition2");
+ SourceEnumeratorState sourceEnumeratorState =
+ new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
+ byte[] serialized = serializer.serialize(sourceEnumeratorState);
+
+ // test deserialize with nonnull lake source
+ serializer = new FlussSourceEnumeratorStateSerializer(new TestingLakeSource());
+ SourceEnumeratorState deserializedSourceEnumeratorState =
+ serializer.deserialize(serializer.getVersion(), serialized);
+ assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
+ }
}