From 346995b8530bcb3392b552a46ff2c9db92277cc4 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Tue, 11 Mar 2025 19:22:18 +0800 Subject: [PATCH 1/3] [flink] Flink sink support hash by bucket key for PrimaryKey Table --- .../fluss/bucketing/BucketingFunction.java | 4 +- .../alibaba/fluss/memory/MemorySegment.java | 3 +- .../row/compacted/CompactedRowWriter.java | 2 +- .../alibaba/fluss/row/encode/KeyEncoder.java | 3 +- .../fluss/flink/FlinkConnectorOptions.java | 9 ++++ .../flink/catalog/FlinkTableFactory.java | 7 ++- .../alibaba/fluss/flink/sink/FlinkSink.java | 43 ++++++++++++++++-- .../fluss/flink/sink/FlinkTableSink.java | 24 ++++++++-- .../fluss/flink/sink/RowDataKeySelector.java | 44 +++++++++++++++++++ .../utils/FlinkConnectorOptionsUtils.java | 15 +++++++ .../testutils/TestingDatabaseSyncSink.java | 3 ++ 11 files changed, 146 insertions(+), 11 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java diff --git a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java index 71e52ac5fd..6bf44058f5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java @@ -20,8 +20,10 @@ import javax.annotation.Nullable; +import java.io.Serializable; + /** An interface to assign a bucket according to the bucket key byte array. */ -public interface BucketingFunction { +public interface BucketingFunction extends Serializable { /** * Assign a bucket according to the bucket key byte array. diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java index a7c65ea0ea..7d9135d0ac 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java @@ -23,6 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -64,7 +65,7 @@ * implementations on invocations of abstract methods. */ @Internal -public final class MemorySegment { +public final class MemorySegment implements Serializable { /** The unsafe handle for transparent memory copied (heap / off-heap). */ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java b/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java index cb7af99f1a..6f50cd9013 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java @@ -64,7 +64,7 @@ * the positive integer is doubled. We assume that the probability of general integers being * positive is higher, so sacrifice the negative number to promote the positive number. */ -public class CompactedRowWriter { +public class CompactedRowWriter implements Serializable { private final int headerSizeInBytes; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java index ced1c987e2..d616f87f5b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java @@ -23,10 +23,11 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.util.List; /** An interface for encoding key of row into bytes. */ -public interface KeyEncoder { +public interface KeyEncoder extends Serializable { /** Encode the key of given row to byte array. */ byte[] encodeKey(InternalRow row); diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java index f8299c59ab..dc8b3ffdcd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java @@ -106,6 +106,15 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to ignore retract(-U/-D) record."); + public static final ConfigOption SINK_PRE_HASH_BY_BUCKET_KEY = + ConfigOptions.key("sink.pre-hash-by-bucket-key") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to pre hash the record by bucket key before write to sink. Hash the data with the " + + "same bucket key to be processed by the same task can improve the efficiency" + + " of client processing and reduce resource consumption"); + // -------------------------------------------------------------------------------------------- // table storage specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java index 790e61a6f4..0864ddc8ca 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java @@ -52,6 +52,7 @@ import java.util.Set; import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; +import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys; import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption; /** Factory to create table source and table sink for Fluss. */ @@ -151,7 +152,10 @@ public DynamicTableSink createDynamicTableSink(Context context) { context.getPrimaryKeyIndexes(), isStreamingMode, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), - tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE)); + tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE), + tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER), + getBucketKeys(tableOptions), + tableOptions.get(FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY)); } @Override @@ -176,6 +180,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, FlinkConnectorOptions.SINK_IGNORE_DELETE, + FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java index 7985a99b86..1acae2afe7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java @@ -22,11 +22,14 @@ import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter; import com.alibaba.fluss.flink.sink.writer.UpsertSinkWriter; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.encode.KeyEncoder; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -35,8 +38,12 @@ import java.io.IOException; import java.io.Serializable; -/** Flink sink for Fluss. */ -class FlinkSink implements Sink { +/** + * Flink sink for Fluss. + * + *

TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20 + */ +class FlinkSink implements Sink, WithPreWriteTopology { private static final long serialVersionUID = 1L; @@ -61,9 +68,16 @@ public SinkWriter createWriter(WriterInitContext context) throws IOExce return flinkSinkWriter; } + @Override + public DataStream addPreWriteTopology(DataStream input) { + return builder.addPreWriteTopology(input); + } + @Internal interface SinkWriterBuilder extends Serializable { W createWriter(); + + DataStream addPreWriteTopology(DataStream input); } @Internal @@ -91,6 +105,11 @@ public AppendSinkWriterBuilder( public AppendSinkWriter createWriter() { return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete); } + + @Override + public DataStream addPreWriteTopology(DataStream input) { + return input; + } } @Internal @@ -103,18 +122,27 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder addPreWriteTopology(DataStream input) { + return sinkReHash + ? input.partitionCustom( + (bucketId, numPartitions) -> bucketId % numPartitions, + new RowDataKeySelector(bucketKeyEncoder, numBucket)) + : input; + } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java index e4f370f276..52e1af6c3c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.metadata.MergeEngineType; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.encode.KeyEncoder; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -52,6 +53,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType; + /** A Flink {@link DynamicTableSink}. */ public class FlinkTableSink implements DynamicTableSink, @@ -67,6 +70,9 @@ public class FlinkTableSink private final boolean streaming; @Nullable private final MergeEngineType mergeEngineType; private final boolean ignoreDelete; + private final int numBucket; + private final List bucketKeys; + private final boolean sinkReHash; private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -78,7 +84,10 @@ public FlinkTableSink( int[] primaryKeyIndexes, boolean streaming, @Nullable MergeEngineType mergeEngineType, - boolean ignoreDelete) { + boolean ignoreDelete, + int numBucket, + List bucketKeys, + boolean sinkReHash) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; @@ -86,6 +95,9 @@ public FlinkTableSink( this.streaming = streaming; this.mergeEngineType = mergeEngineType; this.ignoreDelete = ignoreDelete; + this.numBucket = numBucket; + this.bucketKeys = bucketKeys; + this.sinkReHash = sinkReHash; } @Override @@ -168,7 +180,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { flussConfig, tableRowType, targetColumnIndexes, - ignoreDelete) + ignoreDelete, + numBucket, + KeyEncoder.of(toFlussRowType(tableRowType), bucketKeys, null), + sinkReHash) : new FlinkSink.AppendSinkWriterBuilder( tablePath, flussConfig, tableRowType, ignoreDelete); @@ -195,7 +210,10 @@ public DynamicTableSink copy() { primaryKeyIndexes, streaming, mergeEngineType, - ignoreDelete); + ignoreDelete, + numBucket, + bucketKeys, + sinkReHash); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java new file mode 100644 index 0000000000..26478e34d1 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import com.alibaba.fluss.bucketing.FlussBucketingFunction; +import com.alibaba.fluss.flink.row.FlinkAsFlussRow; +import com.alibaba.fluss.row.encode.KeyEncoder; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; + +/** {@link KeySelector} to get bucket id. */ +public class RowDataKeySelector implements KeySelector { + + private final FlussBucketingFunction flussBucketingFunction; + private final KeyEncoder bucketKeyEncoder; + private final int numBucket; + + public RowDataKeySelector(KeyEncoder bucketKeyEncoder, int numBucket) { + this.bucketKeyEncoder = bucketKeyEncoder; + this.flussBucketingFunction = new FlussBucketingFunction(); + this.numBucket = numBucket; + } + + @Override + public Integer getKey(RowData rowData) throws Exception { + FlinkAsFlussRow row = new FlinkAsFlussRow().replace(rowData); + return flussBucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java index 323efbcce4..702b1d02e4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java @@ -27,7 +27,11 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static com.alibaba.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static com.alibaba.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; @@ -63,6 +67,17 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions, Zone return options; } + public static List getBucketKeys(ReadableConfig tableOptions) { + Optional bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY); + if (!bucketKey.isPresent()) { + // log tables don't have bucket key by default + return new ArrayList<>(); + } + + String[] keys = bucketKey.get().split(","); + return Arrays.stream(keys).collect(Collectors.toList()); + } + public static int[] getBucketKeyIndexes(ReadableConfig tableOptions, RowType schema) { Optional bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY); if (!bucketKey.isPresent()) { diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 2e75aa2619..669d70642e 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -124,6 +124,9 @@ public void write(MultiplexCdcRecord record, Context context) tableInfo.getSchema().getPrimaryKeyIndexes(), true, null, + false, + tableInfo.getNumBuckets(), + tableInfo.getBucketKeys(), false); Sink sink = From 93ee782e3880a15cf065499c78ec227d6a29e223 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 17 Mar 2025 10:23:45 +0800 Subject: [PATCH 2/3] address comments --- .../fluss/bucketing/BucketingFunction.java | 4 +- .../alibaba/fluss/memory/MemorySegment.java | 3 +- .../row/compacted/CompactedRowWriter.java | 2 +- .../alibaba/fluss/row/encode/KeyEncoder.java | 3 +- .../fluss/flink/FlinkConnectorOptions.java | 15 ++- .../flink/catalog/FlinkTableFactory.java | 14 ++- .../fluss/flink/sink/ChannelComputer.java | 40 +++++++ .../sink/FlinkRowDataChannelComputer.java | 103 ++++++++++++++++++ .../alibaba/fluss/flink/sink/FlinkSink.java | 73 ++++++++++--- .../flink/sink/FlinkStreamPartitioner.java | 74 +++++++++++++ .../fluss/flink/sink/FlinkTableSink.java | 36 ++++-- .../fluss/flink/sink/RowDataKeySelector.java | 44 -------- .../flink/sink/FlinkTableSinkITCase.java | 93 +++++++++++----- .../testutils/TestingDatabaseSyncSink.java | 4 +- fluss-test-coverage/pom.xml | 2 +- website/docs/engine-flink/options.md | 61 ++++++----- 16 files changed, 425 insertions(+), 146 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java delete mode 100644 fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java diff --git a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java index 6bf44058f5..71e52ac5fd 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java @@ -20,10 +20,8 @@ import javax.annotation.Nullable; -import java.io.Serializable; - /** An interface to assign a bucket according to the bucket key byte array. */ -public interface BucketingFunction extends Serializable { +public interface BucketingFunction { /** * Assign a bucket according to the bucket key byte array. diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java index 7d9135d0ac..a7c65ea0ea 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java @@ -23,7 +23,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -65,7 +64,7 @@ * implementations on invocations of abstract methods. */ @Internal -public final class MemorySegment implements Serializable { +public final class MemorySegment { /** The unsafe handle for transparent memory copied (heap / off-heap). */ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java b/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java index 6f50cd9013..cb7af99f1a 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java @@ -64,7 +64,7 @@ * the positive integer is doubled. We assume that the probability of general integers being * positive is higher, so sacrifice the negative number to promote the positive number. */ -public class CompactedRowWriter implements Serializable { +public class CompactedRowWriter { private final int headerSizeInBytes; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java index d616f87f5b..ced1c987e2 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java @@ -23,11 +23,10 @@ import javax.annotation.Nullable; -import java.io.Serializable; import java.util.List; /** An interface for encoding key of row into bytes. */ -public interface KeyEncoder extends Serializable { +public interface KeyEncoder { /** Encode the key of given row to byte array. */ byte[] encodeKey(InternalRow row); diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java index dc8b3ffdcd..94fccdec7e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java @@ -106,14 +106,17 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to ignore retract(-U/-D) record."); - public static final ConfigOption SINK_PRE_HASH_BY_BUCKET_KEY = - ConfigOptions.key("sink.pre-hash-by-bucket-key") + public static final ConfigOption SINK_BUCKET_SHUFFLE = + ConfigOptions.key("sink.bucket-shuffle") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( - "Whether to pre hash the record by bucket key before write to sink. Hash the data with the " - + "same bucket key to be processed by the same task can improve the efficiency" - + " of client processing and reduce resource consumption"); + "Whether to shuffle by bucket id before write to sink. Shuffling the data with the same " + + "bucket id to be processed by the same task can improve the efficiency of client " + + "processing and reduce resource consumption. For Log Table, bucket shuffle will " + + "only take effect when the '" + + BUCKET_KEY.key() + + "' is defined. For Primary Key table, it is enabled by default."); // -------------------------------------------------------------------------------------------- // table storage specific options diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java index 0864ddc8ca..d3e435e99b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java @@ -49,9 +49,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; +import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes; import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys; import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption; @@ -95,8 +97,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { resolvedCatalogTable.getPartitionKeys().stream() .mapToInt(tableOutputType::getFieldIndex) .toArray(); - int[] bucketKeyIndexes = - FlinkConnectorOptionsUtils.getBucketKeyIndexes(tableOptions, tableOutputType); + int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, tableOutputType); // options for lookup LookupCache cache = null; @@ -142,6 +143,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); + List partitionKeys = resolvedCatalogTable.getPartitionKeys(); + RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType(); final ReadableConfig tableOptions = helper.getOptions(); @@ -150,12 +154,14 @@ public DynamicTableSink createDynamicTableSink(Context context) { toFlussClientConfig(tableOptions, context.getConfiguration()), rowType, context.getPrimaryKeyIndexes(), + partitionKeys, isStreamingMode, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_FORMAT)), tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE), tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER), getBucketKeys(tableOptions), - tableOptions.get(FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY)); + tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE)); } @Override @@ -180,7 +186,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, FlinkConnectorOptions.SINK_IGNORE_DELETE, - FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY, + FlinkConnectorOptions.SINK_BUCKET_SHUFFLE, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java new file mode 100644 index 0000000000..76d85b8924 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import java.io.Serializable; + +/** + * A utility class to compute which downstream channel a given record should be sent to before flink + * sink. + * + * @param type of record + */ +public interface ChannelComputer extends Serializable { + void setup(int numChannels); + + int channel(T record); + + static int select(String partitionName, int bucket, int numChannels) { + int startChannel = Math.abs(partitionName.hashCode()) % numChannels; + return (startChannel + bucket) % numChannels; + } + + static int select(int bucket, int numChannels) { + return bucket % numChannels; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java new file mode 100644 index 0000000000..2647a38c5b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import com.alibaba.fluss.bucketing.BucketingFunction; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.flink.row.FlinkAsFlussRow; +import com.alibaba.fluss.metadata.DataLakeFormat; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.types.RowType; + +import org.apache.flink.table.data.RowData; + +import javax.annotation.Nullable; + +import java.util.List; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** {@link ChannelComputer} for flink {@link RowData}. */ +public class FlinkRowDataChannelComputer implements ChannelComputer { + + private final @Nullable DataLakeFormat lakeFormat; + private final int numBucket; + private final RowType flussRowType; + private final List bucketKeys; + private final List partitionKeys; + + private transient int numChannels; + private transient BucketingFunction bucketingFunction; + private transient KeyEncoder bucketKeyEncoder; + private transient boolean combineShuffleWithPartitionName; + private transient @Nullable PartitionGetter partitionGetter; + + public FlinkRowDataChannelComputer( + RowType flussRowType, + List bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + int numBucket) { + this.flussRowType = flussRowType; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.numBucket = numBucket; + } + + @Override + public void setup(int numChannels) { + this.numChannels = numChannels; + this.bucketingFunction = BucketingFunction.of(lakeFormat); + this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat); + if (partitionKeys.isEmpty()) { + this.partitionGetter = null; + } else { + this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys); + } + + // Only when partition keys exist and the Flink job parallelism and the bucket number are + // not divisible, then we need to include the partition name as part of the shuffle key. + // This approach can help avoid the possible data skew. For example, if bucket number is 3 + // and task parallelism is 2, it is highly possible that data shuffle becomes uneven. For + // instance, in task1, it might have 'partition0-bucket0', 'partition1-bucket0', + // 'partition0-bucket2', and 'partition1-bucket2', whereas in task2, it would only have + // 'partition0-bucket1' and 'partition1-bucket1'. As partition number increases, this + // situation becomes even more severe. + this.combineShuffleWithPartitionName = + partitionGetter != null + && (numBucket % numChannels != 0 || numChannels % numBucket != 0); + } + + @Override + public int channel(RowData record) { + FlinkAsFlussRow row = new FlinkAsFlussRow().replace(record); + int bucketId = bucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket); + if (!combineShuffleWithPartitionName) { + return ChannelComputer.select(bucketId, numChannels); + } else { + checkNotNull(partitionGetter, "partitionGetter is null"); + String partitionName = partitionGetter.getPartition(row); + return ChannelComputer.select(partitionName, bucketId, numChannels); + } + } + + @Override + public String toString() { + return "BUCKET_SHUFFLE"; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java index 1acae2afe7..6f5128cfd2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java @@ -21,8 +21,8 @@ import com.alibaba.fluss.flink.sink.writer.AppendSinkWriter; import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter; import com.alibaba.fluss.flink.sink.writer.UpsertSinkWriter; +import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.row.encode.KeyEncoder; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; @@ -37,11 +37,16 @@ import java.io.IOException; import java.io.Serializable; +import java.util.List; + +import static com.alibaba.fluss.flink.sink.FlinkStreamPartitioner.partition; +import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType; /** * Flink sink for Fluss. * - *

TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20 + *

TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20. Trace + * by https://github.com/alibaba/fluss/issues/622. */ class FlinkSink implements Sink, WithPreWriteTopology { @@ -89,16 +94,31 @@ static class AppendSinkWriterBuilder implements SinkWriterBuilder bucketKeys; + private final List partitionKeys; + private final @Nullable DataLakeFormat lakeFormat; + private final boolean shuffleByBucketId; public AppendSinkWriterBuilder( TablePath tablePath, Configuration flussConfig, RowType tableRowType, - boolean ignoreDelete) { + boolean ignoreDelete, + int numBucket, + List bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.ignoreDelete = ignoreDelete; + this.numBucket = numBucket; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.shuffleByBucketId = shuffleByBucketId; } @Override @@ -108,7 +128,20 @@ public AppendSinkWriter createWriter() { @Override public DataStream addPreWriteTopology(DataStream input) { - return input; + // For append only sink, we will do bucket shuffle only if bucket keys are not empty. + if (!bucketKeys.isEmpty() && shuffleByBucketId) { + return partition( + input, + new FlinkRowDataChannelComputer( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket), + input.getParallelism()); + } else { + return input; + } } } @@ -123,8 +156,10 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder bucketKeys; + private final List partitionKeys; + private final @Nullable DataLakeFormat lakeFormat; + private final boolean shuffleByBucketId; UpsertSinkWriterBuilder( TablePath tablePath, @@ -133,16 +168,20 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.targetColumnIndexes = targetColumnIndexes; this.ignoreDelete = ignoreDelete; this.numBucket = numBucket; - this.bucketKeyEncoder = bucketKeyEncoder; - this.sinkReHash = sinkReHash; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.shuffleByBucketId = shuffleByBucketId; } @Override @@ -153,10 +192,16 @@ public UpsertSinkWriter createWriter() { @Override public DataStream addPreWriteTopology(DataStream input) { - return sinkReHash - ? input.partitionCustom( - (bucketId, numPartitions) -> bucketId % numPartitions, - new RowDataKeySelector(bucketKeyEncoder, numBucket)) + return shuffleByBucketId + ? partition( + input, + new FlinkRowDataChannelComputer( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket), + input.getParallelism()) : input; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java new file mode 100644 index 0000000000..f73f3d6f3a --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** A {@link StreamPartitioner} which wraps a {@link ChannelComputer}. */ +public class FlinkStreamPartitioner extends StreamPartitioner { + + private final ChannelComputer channelComputer; + + public FlinkStreamPartitioner(ChannelComputer channelComputer) { + this.channelComputer = channelComputer; + } + + @Override + public void setup(int numberOfChannels) { + super.setup(numberOfChannels); + channelComputer.setup(numberOfChannels); + } + + @Override + public StreamPartitioner copy() { + return this; + } + + @Override + public SubtaskStateMapper getDownstreamSubtaskStateMapper() { + return SubtaskStateMapper.FULL; + } + + @Override + public boolean isPointwise() { + return false; + } + + @Override + public String toString() { + return channelComputer.toString(); + } + + @Override + public int selectChannel(SerializationDelegate> record) { + return channelComputer.channel(record.getInstance().getValue()); + } + + public static DataStream partition( + DataStream input, ChannelComputer channelComputer, Integer parallelism) { + FlinkStreamPartitioner partitioner = new FlinkStreamPartitioner<>(channelComputer); + PartitionTransformation partitioned = + new PartitionTransformation<>(input.getTransformation(), partitioner); + partitioned.setParallelism(parallelism == null ? input.getParallelism() : parallelism); + return new DataStream<>(input.getExecutionEnvironment(), partitioned); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java index 52e1af6c3c..a93e443ece 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java @@ -21,10 +21,10 @@ import com.alibaba.fluss.flink.utils.PushdownUtils; import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual; import com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion; +import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.MergeEngineType; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.GenericRow; -import com.alibaba.fluss.row.encode.KeyEncoder; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -53,8 +53,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType; - /** A Flink {@link DynamicTableSink}. */ public class FlinkTableSink implements DynamicTableSink, @@ -67,12 +65,14 @@ public class FlinkTableSink private final Configuration flussConfig; private final RowType tableRowType; private final int[] primaryKeyIndexes; + private final List partitionKeys; private final boolean streaming; @Nullable private final MergeEngineType mergeEngineType; private final boolean ignoreDelete; private final int numBucket; private final List bucketKeys; - private final boolean sinkReHash; + private final boolean shuffleByBucketId; + private final @Nullable DataLakeFormat lakeFormat; private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -82,22 +82,26 @@ public FlinkTableSink( Configuration flussConfig, RowType tableRowType, int[] primaryKeyIndexes, + List partitionKeys, boolean streaming, @Nullable MergeEngineType mergeEngineType, + @Nullable DataLakeFormat lakeFormat, boolean ignoreDelete, int numBucket, List bucketKeys, - boolean sinkReHash) { + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; + this.partitionKeys = partitionKeys; this.streaming = streaming; this.mergeEngineType = mergeEngineType; this.ignoreDelete = ignoreDelete; this.numBucket = numBucket; this.bucketKeys = bucketKeys; - this.sinkReHash = sinkReHash; + this.shuffleByBucketId = shuffleByBucketId; + this.lakeFormat = lakeFormat; } @Override @@ -182,10 +186,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { targetColumnIndexes, ignoreDelete, numBucket, - KeyEncoder.of(toFlussRowType(tableRowType), bucketKeys, null), - sinkReHash) + bucketKeys, + partitionKeys, + lakeFormat, + shuffleByBucketId) : new FlinkSink.AppendSinkWriterBuilder( - tablePath, flussConfig, tableRowType, ignoreDelete); + tablePath, + flussConfig, + tableRowType, + ignoreDelete, + numBucket, + bucketKeys, + partitionKeys, + lakeFormat, + shuffleByBucketId); FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder); @@ -208,12 +222,14 @@ public DynamicTableSink copy() { flussConfig, tableRowType, primaryKeyIndexes, + partitionKeys, streaming, mergeEngineType, + lakeFormat, ignoreDelete, numBucket, bucketKeys, - sinkReHash); + shuffleByBucketId); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java deleted file mode 100644 index 26478e34d1..0000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/RowDataKeySelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2025 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.fluss.flink.sink; - -import com.alibaba.fluss.bucketing.FlussBucketingFunction; -import com.alibaba.fluss.flink.row.FlinkAsFlussRow; -import com.alibaba.fluss.row.encode.KeyEncoder; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.table.data.RowData; - -/** {@link KeySelector} to get bucket id. */ -public class RowDataKeySelector implements KeySelector { - - private final FlussBucketingFunction flussBucketingFunction; - private final KeyEncoder bucketKeyEncoder; - private final int numBucket; - - public RowDataKeySelector(KeyEncoder bucketKeyEncoder, int numBucket) { - this.bucketKeyEncoder = bucketKeyEncoder; - this.flussBucketingFunction = new FlussBucketingFunction(); - this.numBucket = numBucket; - } - - @Override - public Integer getKey(RowData rowData) throws Exception { - FlinkAsFlussRow row = new FlinkAsFlussRow().replace(rowData); - return flussBucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket); - } -} diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java index 982d113298..b4b6e39d5f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -156,24 +157,41 @@ void testAppendLog(boolean compressed) throws Exception { } @Test - void testAppendLogWithBucketKey() throws Exception { - tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " - + "('bucket.num' = '3', 'bucket.key' = 'c')"); + void testAppendLogWithBucketKeyWithSinkBucketShuffle() throws Exception { + testAppendLogWithBucketKey(true); + } + + @Test + void testAppendLogWithBucketKeyWithoutSinkBucketShuffle() throws Exception { + testAppendLogWithBucketKey(false); + } + + private void testAppendLogWithBucketKey(boolean sinkBucketShuffle) throws Exception { tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " - + "VALUES (1, 3501, 'Tim'), " - + "(2, 3502, 'Fabian'), " - + "(3, 3503, 'Tim'), " - + "(4, 3504, 'jerry'), " - + "(5, 3505, 'piggy'), " - + "(7, 3507, 'Fabian'), " - + "(8, 3508, 'stave'), " - + "(9, 3509, 'Tim'), " - + "(10, 3510, 'coco'), " - + "(11, 3511, 'stave'), " - + "(12, 3512, 'Tim')") - .await(); + String.format( + "create table sink_test (a int not null, b bigint, c string) " + + "with ('bucket.num' = '3', 'bucket.key' = 'c', 'sink.bucket-shuffle'= '%s')", + sinkBucketShuffle)); + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'Tim'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(7, 3507, 'Fabian'), " + + "(8, 3508, 'stave'), " + + "(9, 3509, 'Tim'), " + + "(10, 3510, 'coco'), " + + "(11, 3511, 'stave'), " + + "(12, 3512, 'Tim')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (sinkBucketShuffle) { + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + } else { + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); + } + tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); //noinspection ArraysAsListWithZeroOrOneArgument @@ -291,18 +309,37 @@ void testAppendLogWithMultiBatch() throws Exception { } @Test - void testPut() throws Exception { - tEnv.executeSql( - "create table sink_test (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); + void testPutWithSinkBucketShuffle() throws Exception { + testPut(true); + } + + @Test + void testPutWithoutSinkBucketShuffle() throws Exception { + testPut(false); + } + + private void testPut(boolean sinkBucketShuffle) throws Exception { tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " - + "VALUES (1, 3501, 'Tim'), " - + "(2, 3502, 'Fabian'), " - + "(3, 3503, 'coco'), " - + "(4, 3504, 'jerry'), " - + "(5, 3505, 'piggy'), " - + "(6, 3506, 'stave')") - .await(); + String.format( + "create table sink_test (a int not null primary key not enforced, b bigint, c string)" + + " with('bucket.num' = '3', 'sink.bucket-shuffle'= '%s')", + sinkBucketShuffle)); + + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'coco'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(6, 3506, 'stave')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (sinkBucketShuffle) { + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + } else { + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); + } + tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); List expectedRows = diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 669d70642e..8d1854f6ca 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -122,12 +122,14 @@ public void write(MultiplexCdcRecord record, Context context) flussConfig, FlinkConversions.toFlinkRowType(tableInfo.getRowType()), tableInfo.getSchema().getPrimaryKeyIndexes(), + tableInfo.getPartitionKeys(), true, null, + tableInfo.getTableConfig().getDataLakeFormat().orElse(null), false, tableInfo.getNumBuckets(), tableInfo.getBucketKeys(), - false); + true); Sink sink = ((SinkV2Provider) diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 247e085542..21f9bfa93b 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -324,7 +324,7 @@ com.alibaba.fluss.flink.source.reader.RecordAndPos - com.alibaba.fluss.flink.sink.FlinkSink + com.alibaba.fluss.flink.sink.* com.alibaba.fluss.flink.metrics.* diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 3075dd9e36..bfaa1f999a 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -94,14 +94,14 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); | table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | | table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | | table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only 'paimon' is supported. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster | -| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. The supported merge engines are 'first_row' and 'versioned'. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | +| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. The supported merge engines are 'first_row' and 'versioned'. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | | table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the 'versioned' merge engine. If the merge engine is set to 'versioned', the version column must be set. | ## Read Options | Option | Type | Default | Description | |-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | +| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | | scan.startup.timestamp | Long | (None) | The timestamp to start reading the data from. This option is only valid when `scan.startup.mode` is set to `timestamp`. The format is 'milli-second-since-epoch' or 'yyyy-MM-dd HH:mm:ss', like '1678883047356' or '2023-12-09 23:09:12'. | | scan.partition.discovery.interval | Duration | 10s | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | @@ -116,38 +116,39 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); ## Lookup Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | -| lookup.cache | Enum | optional | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | -| lookup.max-retries | Integer | optional | 3 | The maximum allowed retries if a lookup operation fails. | -| lookup.partial-cache.expire-after-access | Duration | optional | (none) | Duration to expire an entry in the cache after accessing. | -| lookup.partial-cache.expire-after-write | Duration | optional | (none) | Duration to expire an entry in the cache after writing. | -| lookup.partial-cache.cache-missing-key | Boolean | optional | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | -| lookup.partial-cache.max-rows | Long | optional | true | The maximum number of rows to store in the cache. | -| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | -| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | -| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | -| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | +| Option | Type | Default | Description | +|------------------------------------------|------------|------------|-----------------------------------------------------------------------------------------------------------------------------| +| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | +| lookup.cache | Enum | optional | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | +| lookup.max-retries | Integer | optional | 3 | The maximum allowed retries if a lookup operation fails. | +| lookup.partial-cache.expire-after-access | Duration | optional | (none) | Duration to expire an entry in the cache after accessing. | +| lookup.partial-cache.expire-after-write | Duration | optional | (none) | Duration to expire an entry in the cache after writing. | +| lookup.partial-cache.cache-missing-key | Boolean | optional | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | +| lookup.partial-cache.max-rows | Long | optional | true | The maximum number of rows to store in the cache. | +| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | +| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | +| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | +| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | ## Write Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | -| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | -| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | -| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | -| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | -| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | -| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | -| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | -| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | -| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | -| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | -| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | -| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | +| Option | Type | Default | Description | +|-----------------------------------------------------|------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | +| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. | +| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | +| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | +| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | +| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | +| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | +| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | +| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | +| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | +| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | +| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | +| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | +| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | ## Other Options From 21c27b32f4bc1b984c76ab9e4e77e231bcfaeccc Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 24 Mar 2025 21:26:29 +0800 Subject: [PATCH 3/3] address jark's comments2 --- .../sink/FlinkRowDataChannelComputer.java | 11 +- .../flink/sink/FlinkStreamPartitioner.java | 2 + .../sink/FlinkRowDataChannelComputerTest.java | 104 ++++++++++++++++++ .../flink/sink/FlinkTableSinkITCase.java | 3 + 4 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java index 2647a38c5b..789825e65b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.flink.sink; +import com.alibaba.fluss.annotation.VisibleForTesting; import com.alibaba.fluss.bucketing.BucketingFunction; import com.alibaba.fluss.client.table.getter.PartitionGetter; import com.alibaba.fluss.flink.row.FlinkAsFlussRow; @@ -34,6 +35,8 @@ /** {@link ChannelComputer} for flink {@link RowData}. */ public class FlinkRowDataChannelComputer implements ChannelComputer { + private static final long serialVersionUID = 1L; + private final @Nullable DataLakeFormat lakeFormat; private final int numBucket; private final RowType flussRowType; @@ -79,8 +82,7 @@ public void setup(int numChannels) { // 'partition0-bucket1' and 'partition1-bucket1'. As partition number increases, this // situation becomes even more severe. this.combineShuffleWithPartitionName = - partitionGetter != null - && (numBucket % numChannels != 0 || numChannels % numBucket != 0); + partitionGetter != null && numBucket % numChannels != 0; } @Override @@ -100,4 +102,9 @@ public int channel(RowData record) { public String toString() { return "BUCKET_SHUFFLE"; } + + @VisibleForTesting + boolean isCombineShuffleWithPartitionName() { + return combineShuffleWithPartitionName; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java index f73f3d6f3a..a5aa542632 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java @@ -26,6 +26,8 @@ /** A {@link StreamPartitioner} which wraps a {@link ChannelComputer}. */ public class FlinkStreamPartitioner extends StreamPartitioner { + private static final long serialVersionUID = 1L; + private final ChannelComputer channelComputer; public FlinkStreamPartitioner(ChannelComputer channelComputer) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java new file mode 100644 index 0000000000..4a43b02255 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlinkRowDataChannelComputer}. */ +class FlinkRowDataChannelComputerTest { + + @Test + void testSelectChanel() { + FlinkRowDataChannelComputer channelComputer = + new FlinkRowDataChannelComputer( + DATA1_ROW_TYPE, + Collections.singletonList("a"), + Collections.emptyList(), + null, + 10); + + for (int numChannel = 1; numChannel <= 10; numChannel++) { + channelComputer.setup(numChannel); + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isFalse(); + for (int i = 0; i < 100; i++) { + int expectedChannel = -1; + for (int retry = 0; retry < 5; retry++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("a1")); + int channel = channelComputer.channel(row); + if (expectedChannel < 0) { + expectedChannel = channel; + } else { + assertThat(channel).isEqualTo(expectedChannel); + assertThat(channel).isLessThan(numChannel); + } + } + } + } + } + + @Test + void testSelectChanelForPartitionedTable() { + FlinkRowDataChannelComputer channelComputer = + new FlinkRowDataChannelComputer( + DATA1_ROW_TYPE, + Collections.singletonList("a"), + Collections.singletonList("b"), + null, + 10); + + for (int numChannel = 1; numChannel <= 10; numChannel++) { + channelComputer.setup(numChannel); + if (10 % numChannel != 0) { + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isTrue(); + } else { + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isFalse(); + } + for (int i = 0; i < 100; i++) { + int expectedChannel = -1; + for (int retry = 0; retry < 5; retry++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("a1")); + int channel = channelComputer.channel(row); + if (expectedChannel < 0) { + expectedChannel = channel; + } else { + assertThat(channel).isEqualTo(expectedChannel); + assertThat(channel).isLessThan(numChannel); + } + } + } + } + + // numChannels is divisible by 10 + channelComputer.setup(5); + GenericRowData row1 = GenericRowData.of(0, StringData.fromString("hello")); + GenericRowData row2 = GenericRowData.of(0, StringData.fromString("no")); + assertThat(channelComputer.channel(row1)).isEqualTo(channelComputer.channel(row2)); + + // numChannels is not divisible by 10 + channelComputer.setup(3); + row1 = GenericRowData.of(0, StringData.fromString("hello")); + row2 = GenericRowData.of(0, StringData.fromString("no")); + assertThat(channelComputer.channel(row1)).isNotEqualTo(channelComputer.channel(row2)); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java index b4b6e39d5f..7e10b018a8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java @@ -156,6 +156,7 @@ void testAppendLog(boolean compressed) throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these two tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test void testAppendLogWithBucketKeyWithSinkBucketShuffle() throws Exception { testAppendLogWithBucketKey(true); @@ -308,6 +309,7 @@ void testAppendLogWithMultiBatch() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these two tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test void testPutWithSinkBucketShuffle() throws Exception { testPut(true); @@ -520,6 +522,7 @@ void testIgnoreDelete(boolean isPrimaryKeyTable) throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test void testWritePartitionedLogTable() throws Exception { testWritePartitionedTable(false, false);