Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.fluss.flink.source.state;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.types.Tuple2;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
Expand All @@ -37,17 +39,40 @@
import java.util.Map;
import java.util.Set;

/** A serializer for {@link SourceEnumeratorState}. */
import static org.apache.fluss.utils.Preconditions.checkNotNull;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to add what's the different with different version? such as:
image

/**
* Serializer for {@link SourceEnumeratorState}.
*
* <p>This serializer manages the versioned persistence of the enumerator's state, including
* assigned buckets, partitions, and remaining hybrid lake/Fluss splits.
*
* <h3>Version Evolution:</h3>
*
* <ul>
* <li><b>Version 0:</b> Initial version. Remaining hybrid lake splits are only (de)serialized if
* the {@code lakeSource} is non-null.
* <li><b>Version 1 (Current):</b> Decouples split serialization from the {@code lakeSource}
* presence. It always attempts to (de)serialize the splits, using an internal boolean flag to
* indicate presence. This ensures state consistency regardless of the current runtime
* configuration.
* </ul>
*
* <p><b>Compatibility Note:</b> This serializer is designed for backward compatibility. It can
* deserialize states from Version 0, but always produces Version 1 during serialization.
*/
public class FlussSourceEnumeratorStateSerializer
implements SimpleVersionedSerializer<SourceEnumeratorState> {

@Nullable private final LakeSource<LakeSplit> lakeSource;

private static final int VERSION_0 = 0;
private static final int VERSION_1 = 1;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = VERSION_0;
private static final int CURRENT_VERSION = VERSION_1;

public FlussSourceEnumeratorStateSerializer(LakeSource<LakeSplit> lakeSource) {
this.lakeSource = lakeSource;
Expand All @@ -61,11 +86,28 @@ public int getVersion() {
@Override
public byte[] serialize(SourceEnumeratorState state) throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();

// serialize assign bucket and partitions
serializeAssignBucketAndPartitions(
out, state.getAssignedBuckets(), state.getAssignedPartitions());

// serialize remain hybrid lake splits
serializeRemainingHybridLakeFlussSplits(out, state);

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}

private void serializeAssignBucketAndPartitions(
DataOutputSerializer out,
Set<TableBucket> assignedBuckets,
Map<Long, String> assignedPartitions)
throws IOException {
// write assigned buckets
out.writeInt(state.getAssignedBuckets().size());
for (TableBucket tableBucket : state.getAssignedBuckets()) {
out.writeInt(assignedBuckets.size());
for (TableBucket tableBucket : assignedBuckets) {
out.writeLong(tableBucket.getTableId());

// write partition
// if partition is not null
if (tableBucket.getPartitionId() != null) {
Expand All @@ -78,27 +120,97 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
out.writeInt(tableBucket.getBucket());
}
// write assigned partitions
out.writeInt(state.getAssignedPartitions().size());
for (Map.Entry<Long, String> entry : state.getAssignedPartitions().entrySet()) {
out.writeInt(assignedPartitions.size());
for (Map.Entry<Long, String> entry : assignedPartitions.entrySet()) {
out.writeLong(entry.getKey());
out.writeUTF(entry.getValue());
}
}

private void serializeRemainingHybridLakeFlussSplits(
final DataOutputSerializer out, SourceEnumeratorState state) throws IOException {
List<SourceSplitBase> remainingHybridLakeFlussSplits =
state.getRemainingHybridLakeFlussSplits();
if (remainingHybridLakeFlussSplits != null) {
// write that hybrid lake fluss splits is not null
out.writeBoolean(true);
out.writeInt(remainingHybridLakeFlussSplits.size());
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
out.writeInt(sourceSplitSerializer.getVersion());
for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
byte[] serializeBytes = sourceSplitSerializer.serialize(split);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}
} else {
// write that hybrid lake fluss splits is null
out.writeBoolean(false);
}
}

@VisibleForTesting
protected byte[] serializeV0(SourceEnumeratorState state) throws IOException {
final DataOutputSerializer out = SERIALIZER_CACHE.get();
serializeAssignBucketAndPartitions(
out, state.getAssignedBuckets(), state.getAssignedPartitions());
if (lakeSource != null) {
serializeRemainingHybridLakeFlussSplits(out, state);
}

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}

@Override
public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
if (version != VERSION_0) {
throw new IOException("Unknown version or corrupt state: " + version);
switch (version) {
case VERSION_1:
return deserializeV1(serialized);
case VERSION_0:
return deserializeV0(serialized);
default:
throw new IOException(
String.format(
"The bytes are serialized with version %d, "
+ "while this deserializer only supports version up to %d",
version, CURRENT_VERSION));
}
}

private SourceEnumeratorState deserializeV0(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
deserializeAssignBucketAndPartitions(in);
List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
// in version 0, deserialize remaining hybrid lake Fluss splits only when lakeSource is
// not null.
if (lakeSource != null) {
remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits(in);
}
final DataInputDeserializer in = new DataInputDeserializer(serialized);
return new SourceEnumeratorState(
assignBucketAndPartitions.f0,
assignBucketAndPartitions.f1,
remainingHybridLakeFlussSplits);
}

private SourceEnumeratorState deserializeV1(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
deserializeAssignBucketAndPartitions(in);
List<SourceSplitBase> remainingHybridLakeFlussSplits =
deserializeRemainingHybridLakeFlussSplits(in);
// in version 1, always attempt to deserialize remaining hybrid lake/Fluss
// splits. The serialized state encodes their presence via a boolean flag, so
// this logic no longer depends on the lakeSource flag. This unconditional
// deserialization is the intended behavior change compared to VERSION_0.
return new SourceEnumeratorState(
assignBucketAndPartitions.f0,
assignBucketAndPartitions.f1,
remainingHybridLakeFlussSplits);
}

private Tuple2<Set<TableBucket>, Map<Long, String>> deserializeAssignBucketAndPartitions(
DataInputDeserializer in) throws IOException {
// deserialize assigned buckets
int assignedBucketsSize = in.readInt();
Set<TableBucket> assignedBuckets = new HashSet<>(assignedBucketsSize);
Expand All @@ -122,36 +234,7 @@ public SourceEnumeratorState deserialize(int version, byte[] serialized) throws
String partition = in.readUTF();
assignedPartitions.put(partitionId, partition);
}

List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
if (lakeSource != null) {
// todo: add a ut for serialize remaining hybrid lake fluss splits
remainingHybridLakeFlussSplits = deserializeRemainingHybridLakeFlussSplits(in);
}

return new SourceEnumeratorState(
assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
}

private void serializeRemainingHybridLakeFlussSplits(
final DataOutputSerializer out, SourceEnumeratorState state) throws IOException {
List<SourceSplitBase> remainingHybridLakeFlussSplits =
state.getRemainingHybridLakeFlussSplits();
if (remainingHybridLakeFlussSplits != null) {
// write that hybrid lake fluss splits is not null
out.writeBoolean(true);
out.writeInt(remainingHybridLakeFlussSplits.size());
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
out.writeInt(sourceSplitSerializer.getVersion());
for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
byte[] serializeBytes = sourceSplitSerializer.serialize(split);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}
} else {
// write that hybrid lake fluss splits is null
out.writeBoolean(false);
}
return Tuple2.of(assignedBuckets, assignedPartitions);
}

@Nullable
Expand All @@ -160,7 +243,11 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
if (in.readBoolean()) {
int numSplits = in.readInt();
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
SourceSplitSerializer sourceSplitSerializer =
new SourceSplitSerializer(
checkNotNull(
lakeSource,
"lake source must not be null when there are hybrid lake splits."));
int version = in.readInt();
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public boolean equals(Object o) {
}
SourceEnumeratorState that = (SourceEnumeratorState) o;
return Objects.equals(assignedBuckets, that.assignedBuckets)
&& Objects.equals(assignedPartitions, that.assignedPartitions);
&& Objects.equals(assignedPartitions, that.assignedPartitions)
&& Objects.equals(
remainingHybridLakeFlussSplits, that.remainingHybridLakeFlussSplits);
}

@Override
public int hashCode() {
return Objects.hash(assignedBuckets, assignedPartitions);
return Objects.hash(assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,66 @@ void testPendingSplitsCheckpointSerde() throws Exception {
/* check deserialized is equal to the original */
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}

@Test
void testV0Compatibility() throws Exception {
// serialize with v0,
int version = 0;
// test with lake source = null
FlussSourceEnumeratorStateSerializer serializer =
new FlussSourceEnumeratorStateSerializer(null);

Set<TableBucket> assignedBuckets =
new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 4L, 1)));
Map<Long, String> assignedPartitions = new HashMap<>();
assignedPartitions.put(1L, "partition1");
assignedPartitions.put(2L, "partition2");
SourceEnumeratorState sourceEnumeratorState =
new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
byte[] serialized = serializer.serializeV0(sourceEnumeratorState);

// then deserialize
SourceEnumeratorState deserializedSourceEnumeratorState =
serializer.deserialize(version, serialized);
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);

// test with lake source is not null
serializer = new FlussSourceEnumeratorStateSerializer(new TestingLakeSource());
List<SourceSplitBase> remainingHybridLakeFlussSplits = new ArrayList<>();
// Add a LogSplit
TableBucket logSplitBucket = new TableBucket(1, 0);
LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
remainingHybridLakeFlussSplits.add(logSplit);
sourceEnumeratorState =
new SourceEnumeratorState(
assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);

serialized = serializer.serializeV0(sourceEnumeratorState);

// then deserialize
deserializedSourceEnumeratorState = serializer.deserialize(version, serialized);
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}

@Test
void testInconsistentLakeSourceSerde() throws Exception {
// test serialize with null lake source
FlussSourceEnumeratorStateSerializer serializer =
new FlussSourceEnumeratorStateSerializer(null);

Set<TableBucket> assignedBuckets =
new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 4L, 1)));
Map<Long, String> assignedPartitions = new HashMap<>();
assignedPartitions.put(1L, "partition1");
assignedPartitions.put(2L, "partition2");
SourceEnumeratorState sourceEnumeratorState =
new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
byte[] serialized = serializer.serialize(sourceEnumeratorState);

// test deserialize with nonnull lake source
serializer = new FlussSourceEnumeratorStateSerializer(new TestingLakeSource());
SourceEnumeratorState deserializedSourceEnumeratorState =
serializer.deserialize(serializer.getVersion(), serialized);
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}
}