diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java index f8299c59ab..94fccdec7e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java @@ -106,6 +106,18 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to ignore retract(-U/-D) record."); + public static final ConfigOption SINK_BUCKET_SHUFFLE = + ConfigOptions.key("sink.bucket-shuffle") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to shuffle by bucket id before write to sink. Shuffling the data with the same " + + "bucket id to be processed by the same task can improve the efficiency of client " + + "processing and reduce resource consumption. For Log Table, bucket shuffle will " + + "only take effect when the '" + + BUCKET_KEY.key() + + "' is defined. For Primary Key table, it is enabled by default."); + // -------------------------------------------------------------------------------------------- // table storage specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java index 790e61a6f4..d3e435e99b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java @@ -49,9 +49,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; +import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes; +import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys; import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption; /** Factory to create table source and table sink for Fluss. */ @@ -94,8 +97,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { resolvedCatalogTable.getPartitionKeys().stream() .mapToInt(tableOutputType::getFieldIndex) .toArray(); - int[] bucketKeyIndexes = - FlinkConnectorOptionsUtils.getBucketKeyIndexes(tableOptions, tableOutputType); + int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, tableOutputType); // options for lookup LookupCache cache = null; @@ -141,6 +143,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); + List partitionKeys = resolvedCatalogTable.getPartitionKeys(); + RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType(); final ReadableConfig tableOptions = helper.getOptions(); @@ -149,9 +154,14 @@ public DynamicTableSink createDynamicTableSink(Context context) { toFlussClientConfig(tableOptions, context.getConfiguration()), rowType, context.getPrimaryKeyIndexes(), + partitionKeys, isStreamingMode, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), - tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE)); + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_FORMAT)), + tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE), + tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER), + getBucketKeys(tableOptions), + tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE)); } @Override @@ -176,6 +186,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, FlinkConnectorOptions.SINK_IGNORE_DELETE, + FlinkConnectorOptions.SINK_BUCKET_SHUFFLE, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java new file mode 100644 index 0000000000..76d85b8924 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/ChannelComputer.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import java.io.Serializable; + +/** + * A utility class to compute which downstream channel a given record should be sent to before flink + * sink. + * + * @param type of record + */ +public interface ChannelComputer extends Serializable { + void setup(int numChannels); + + int channel(T record); + + static int select(String partitionName, int bucket, int numChannels) { + int startChannel = Math.abs(partitionName.hashCode()) % numChannels; + return (startChannel + bucket) % numChannels; + } + + static int select(int bucket, int numChannels) { + return bucket % numChannels; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java new file mode 100644 index 0000000000..789825e65b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.bucketing.BucketingFunction; +import com.alibaba.fluss.client.table.getter.PartitionGetter; +import com.alibaba.fluss.flink.row.FlinkAsFlussRow; +import com.alibaba.fluss.metadata.DataLakeFormat; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.types.RowType; + +import org.apache.flink.table.data.RowData; + +import javax.annotation.Nullable; + +import java.util.List; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** {@link ChannelComputer} for flink {@link RowData}. */ +public class FlinkRowDataChannelComputer implements ChannelComputer { + + private static final long serialVersionUID = 1L; + + private final @Nullable DataLakeFormat lakeFormat; + private final int numBucket; + private final RowType flussRowType; + private final List bucketKeys; + private final List partitionKeys; + + private transient int numChannels; + private transient BucketingFunction bucketingFunction; + private transient KeyEncoder bucketKeyEncoder; + private transient boolean combineShuffleWithPartitionName; + private transient @Nullable PartitionGetter partitionGetter; + + public FlinkRowDataChannelComputer( + RowType flussRowType, + List bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + int numBucket) { + this.flussRowType = flussRowType; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.numBucket = numBucket; + } + + @Override + public void setup(int numChannels) { + this.numChannels = numChannels; + this.bucketingFunction = BucketingFunction.of(lakeFormat); + this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat); + if (partitionKeys.isEmpty()) { + this.partitionGetter = null; + } else { + this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys); + } + + // Only when partition keys exist and the Flink job parallelism and the bucket number are + // not divisible, then we need to include the partition name as part of the shuffle key. + // This approach can help avoid the possible data skew. For example, if bucket number is 3 + // and task parallelism is 2, it is highly possible that data shuffle becomes uneven. For + // instance, in task1, it might have 'partition0-bucket0', 'partition1-bucket0', + // 'partition0-bucket2', and 'partition1-bucket2', whereas in task2, it would only have + // 'partition0-bucket1' and 'partition1-bucket1'. As partition number increases, this + // situation becomes even more severe. + this.combineShuffleWithPartitionName = + partitionGetter != null && numBucket % numChannels != 0; + } + + @Override + public int channel(RowData record) { + FlinkAsFlussRow row = new FlinkAsFlussRow().replace(record); + int bucketId = bucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket); + if (!combineShuffleWithPartitionName) { + return ChannelComputer.select(bucketId, numChannels); + } else { + checkNotNull(partitionGetter, "partitionGetter is null"); + String partitionName = partitionGetter.getPartition(row); + return ChannelComputer.select(partitionName, bucketId, numChannels); + } + } + + @Override + public String toString() { + return "BUCKET_SHUFFLE"; + } + + @VisibleForTesting + boolean isCombineShuffleWithPartitionName() { + return combineShuffleWithPartitionName; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java index 7985a99b86..6f5128cfd2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java @@ -21,12 +21,15 @@ import com.alibaba.fluss.flink.sink.writer.AppendSinkWriter; import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter; import com.alibaba.fluss.flink.sink.writer.UpsertSinkWriter; +import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.TablePath; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -34,9 +37,18 @@ import java.io.IOException; import java.io.Serializable; +import java.util.List; -/** Flink sink for Fluss. */ -class FlinkSink implements Sink { +import static com.alibaba.fluss.flink.sink.FlinkStreamPartitioner.partition; +import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType; + +/** + * Flink sink for Fluss. + * + *

TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20. Trace + * by https://github.com/alibaba/fluss/issues/622. + */ +class FlinkSink implements Sink, WithPreWriteTopology { private static final long serialVersionUID = 1L; @@ -61,9 +73,16 @@ public SinkWriter createWriter(WriterInitContext context) throws IOExce return flinkSinkWriter; } + @Override + public DataStream addPreWriteTopology(DataStream input) { + return builder.addPreWriteTopology(input); + } + @Internal interface SinkWriterBuilder extends Serializable { W createWriter(); + + DataStream addPreWriteTopology(DataStream input); } @Internal @@ -75,22 +94,55 @@ static class AppendSinkWriterBuilder implements SinkWriterBuilder bucketKeys; + private final List partitionKeys; + private final @Nullable DataLakeFormat lakeFormat; + private final boolean shuffleByBucketId; public AppendSinkWriterBuilder( TablePath tablePath, Configuration flussConfig, RowType tableRowType, - boolean ignoreDelete) { + boolean ignoreDelete, + int numBucket, + List bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.ignoreDelete = ignoreDelete; + this.numBucket = numBucket; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.shuffleByBucketId = shuffleByBucketId; } @Override public AppendSinkWriter createWriter() { return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete); } + + @Override + public DataStream addPreWriteTopology(DataStream input) { + // For append only sink, we will do bucket shuffle only if bucket keys are not empty. + if (!bucketKeys.isEmpty() && shuffleByBucketId) { + return partition( + input, + new FlinkRowDataChannelComputer( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket), + input.getParallelism()); + } else { + return input; + } + } } @Internal @@ -103,18 +155,33 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder bucketKeys; + private final List partitionKeys; + private final @Nullable DataLakeFormat lakeFormat; + private final boolean shuffleByBucketId; UpsertSinkWriterBuilder( TablePath tablePath, Configuration flussConfig, RowType tableRowType, @Nullable int[] targetColumnIndexes, - boolean ignoreDelete) { + boolean ignoreDelete, + int numBucket, + List bucketKeys, + List partitionKeys, + @Nullable DataLakeFormat lakeFormat, + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.targetColumnIndexes = targetColumnIndexes; this.ignoreDelete = ignoreDelete; + this.numBucket = numBucket; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.lakeFormat = lakeFormat; + this.shuffleByBucketId = shuffleByBucketId; } @Override @@ -122,5 +189,20 @@ public UpsertSinkWriter createWriter() { return new UpsertSinkWriter( tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete); } + + @Override + public DataStream addPreWriteTopology(DataStream input) { + return shuffleByBucketId + ? partition( + input, + new FlinkRowDataChannelComputer( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket), + input.getParallelism()) + : input; + } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java new file mode 100644 index 0000000000..a5aa542632 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkStreamPartitioner.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** A {@link StreamPartitioner} which wraps a {@link ChannelComputer}. */ +public class FlinkStreamPartitioner extends StreamPartitioner { + + private static final long serialVersionUID = 1L; + + private final ChannelComputer channelComputer; + + public FlinkStreamPartitioner(ChannelComputer channelComputer) { + this.channelComputer = channelComputer; + } + + @Override + public void setup(int numberOfChannels) { + super.setup(numberOfChannels); + channelComputer.setup(numberOfChannels); + } + + @Override + public StreamPartitioner copy() { + return this; + } + + @Override + public SubtaskStateMapper getDownstreamSubtaskStateMapper() { + return SubtaskStateMapper.FULL; + } + + @Override + public boolean isPointwise() { + return false; + } + + @Override + public String toString() { + return channelComputer.toString(); + } + + @Override + public int selectChannel(SerializationDelegate> record) { + return channelComputer.channel(record.getInstance().getValue()); + } + + public static DataStream partition( + DataStream input, ChannelComputer channelComputer, Integer parallelism) { + FlinkStreamPartitioner partitioner = new FlinkStreamPartitioner<>(channelComputer); + PartitionTransformation partitioned = + new PartitionTransformation<>(input.getTransformation(), partitioner); + partitioned.setParallelism(parallelism == null ? input.getParallelism() : parallelism); + return new DataStream<>(input.getExecutionEnvironment(), partitioned); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java index e4f370f276..a93e443ece 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.flink.utils.PushdownUtils; import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual; import com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion; +import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.MergeEngineType; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.GenericRow; @@ -64,9 +65,14 @@ public class FlinkTableSink private final Configuration flussConfig; private final RowType tableRowType; private final int[] primaryKeyIndexes; + private final List partitionKeys; private final boolean streaming; @Nullable private final MergeEngineType mergeEngineType; private final boolean ignoreDelete; + private final int numBucket; + private final List bucketKeys; + private final boolean shuffleByBucketId; + private final @Nullable DataLakeFormat lakeFormat; private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -76,16 +82,26 @@ public FlinkTableSink( Configuration flussConfig, RowType tableRowType, int[] primaryKeyIndexes, + List partitionKeys, boolean streaming, @Nullable MergeEngineType mergeEngineType, - boolean ignoreDelete) { + @Nullable DataLakeFormat lakeFormat, + boolean ignoreDelete, + int numBucket, + List bucketKeys, + boolean shuffleByBucketId) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; + this.partitionKeys = partitionKeys; this.streaming = streaming; this.mergeEngineType = mergeEngineType; this.ignoreDelete = ignoreDelete; + this.numBucket = numBucket; + this.bucketKeys = bucketKeys; + this.shuffleByBucketId = shuffleByBucketId; + this.lakeFormat = lakeFormat; } @Override @@ -168,9 +184,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { flussConfig, tableRowType, targetColumnIndexes, - ignoreDelete) + ignoreDelete, + numBucket, + bucketKeys, + partitionKeys, + lakeFormat, + shuffleByBucketId) : new FlinkSink.AppendSinkWriterBuilder( - tablePath, flussConfig, tableRowType, ignoreDelete); + tablePath, + flussConfig, + tableRowType, + ignoreDelete, + numBucket, + bucketKeys, + partitionKeys, + lakeFormat, + shuffleByBucketId); FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder); @@ -193,9 +222,14 @@ public DynamicTableSink copy() { flussConfig, tableRowType, primaryKeyIndexes, + partitionKeys, streaming, mergeEngineType, - ignoreDelete); + lakeFormat, + ignoreDelete, + numBucket, + bucketKeys, + shuffleByBucketId); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java index 323efbcce4..702b1d02e4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConnectorOptionsUtils.java @@ -27,7 +27,11 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static com.alibaba.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static com.alibaba.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; @@ -63,6 +67,17 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions, Zone return options; } + public static List getBucketKeys(ReadableConfig tableOptions) { + Optional bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY); + if (!bucketKey.isPresent()) { + // log tables don't have bucket key by default + return new ArrayList<>(); + } + + String[] keys = bucketKey.get().split(","); + return Arrays.stream(keys).collect(Collectors.toList()); + } + public static int[] getBucketKeyIndexes(ReadableConfig tableOptions, RowType schema) { Optional bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY); if (!bucketKey.isPresent()) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java new file mode 100644 index 0000000000..4a43b02255 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.sink; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlinkRowDataChannelComputer}. */ +class FlinkRowDataChannelComputerTest { + + @Test + void testSelectChanel() { + FlinkRowDataChannelComputer channelComputer = + new FlinkRowDataChannelComputer( + DATA1_ROW_TYPE, + Collections.singletonList("a"), + Collections.emptyList(), + null, + 10); + + for (int numChannel = 1; numChannel <= 10; numChannel++) { + channelComputer.setup(numChannel); + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isFalse(); + for (int i = 0; i < 100; i++) { + int expectedChannel = -1; + for (int retry = 0; retry < 5; retry++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("a1")); + int channel = channelComputer.channel(row); + if (expectedChannel < 0) { + expectedChannel = channel; + } else { + assertThat(channel).isEqualTo(expectedChannel); + assertThat(channel).isLessThan(numChannel); + } + } + } + } + } + + @Test + void testSelectChanelForPartitionedTable() { + FlinkRowDataChannelComputer channelComputer = + new FlinkRowDataChannelComputer( + DATA1_ROW_TYPE, + Collections.singletonList("a"), + Collections.singletonList("b"), + null, + 10); + + for (int numChannel = 1; numChannel <= 10; numChannel++) { + channelComputer.setup(numChannel); + if (10 % numChannel != 0) { + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isTrue(); + } else { + assertThat(channelComputer.isCombineShuffleWithPartitionName()).isFalse(); + } + for (int i = 0; i < 100; i++) { + int expectedChannel = -1; + for (int retry = 0; retry < 5; retry++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("a1")); + int channel = channelComputer.channel(row); + if (expectedChannel < 0) { + expectedChannel = channel; + } else { + assertThat(channel).isEqualTo(expectedChannel); + assertThat(channel).isLessThan(numChannel); + } + } + } + } + + // numChannels is divisible by 10 + channelComputer.setup(5); + GenericRowData row1 = GenericRowData.of(0, StringData.fromString("hello")); + GenericRowData row2 = GenericRowData.of(0, StringData.fromString("no")); + assertThat(channelComputer.channel(row1)).isEqualTo(channelComputer.channel(row2)); + + // numChannels is not divisible by 10 + channelComputer.setup(3); + row1 = GenericRowData.of(0, StringData.fromString("hello")); + row2 = GenericRowData.of(0, StringData.fromString("no")); + assertThat(channelComputer.channel(row1)).isNotEqualTo(channelComputer.channel(row2)); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java index 982d113298..7e10b018a8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlinkTableSinkITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -155,25 +156,43 @@ void testAppendLog(boolean compressed) throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these two tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test - void testAppendLogWithBucketKey() throws Exception { - tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " - + "('bucket.num' = '3', 'bucket.key' = 'c')"); + void testAppendLogWithBucketKeyWithSinkBucketShuffle() throws Exception { + testAppendLogWithBucketKey(true); + } + + @Test + void testAppendLogWithBucketKeyWithoutSinkBucketShuffle() throws Exception { + testAppendLogWithBucketKey(false); + } + + private void testAppendLogWithBucketKey(boolean sinkBucketShuffle) throws Exception { tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " - + "VALUES (1, 3501, 'Tim'), " - + "(2, 3502, 'Fabian'), " - + "(3, 3503, 'Tim'), " - + "(4, 3504, 'jerry'), " - + "(5, 3505, 'piggy'), " - + "(7, 3507, 'Fabian'), " - + "(8, 3508, 'stave'), " - + "(9, 3509, 'Tim'), " - + "(10, 3510, 'coco'), " - + "(11, 3511, 'stave'), " - + "(12, 3512, 'Tim')") - .await(); + String.format( + "create table sink_test (a int not null, b bigint, c string) " + + "with ('bucket.num' = '3', 'bucket.key' = 'c', 'sink.bucket-shuffle'= '%s')", + sinkBucketShuffle)); + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'Tim'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(7, 3507, 'Fabian'), " + + "(8, 3508, 'stave'), " + + "(9, 3509, 'Tim'), " + + "(10, 3510, 'coco'), " + + "(11, 3511, 'stave'), " + + "(12, 3512, 'Tim')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (sinkBucketShuffle) { + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + } else { + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); + } + tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); //noinspection ArraysAsListWithZeroOrOneArgument @@ -290,19 +309,39 @@ void testAppendLogWithMultiBatch() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these two tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test - void testPut() throws Exception { - tEnv.executeSql( - "create table sink_test (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); + void testPutWithSinkBucketShuffle() throws Exception { + testPut(true); + } + + @Test + void testPutWithoutSinkBucketShuffle() throws Exception { + testPut(false); + } + + private void testPut(boolean sinkBucketShuffle) throws Exception { tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " - + "VALUES (1, 3501, 'Tim'), " - + "(2, 3502, 'Fabian'), " - + "(3, 3503, 'coco'), " - + "(4, 3504, 'jerry'), " - + "(5, 3505, 'piggy'), " - + "(6, 3506, 'stave')") - .await(); + String.format( + "create table sink_test (a int not null primary key not enforced, b bigint, c string)" + + " with('bucket.num' = '3', 'sink.bucket-shuffle'= '%s')", + sinkBucketShuffle)); + + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'coco'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(6, 3506, 'stave')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (sinkBucketShuffle) { + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + } else { + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); + } + tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); List expectedRows = @@ -483,6 +522,7 @@ void testIgnoreDelete(boolean isPrimaryKeyTable) throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + // TODO change these tests to ParameterizedTest: https://github.com/alibaba/fluss/issues/659 @Test void testWritePartitionedLogTable() throws Exception { testWritePartitionedTable(false, false); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 2e75aa2619..8d1854f6ca 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -122,9 +122,14 @@ public void write(MultiplexCdcRecord record, Context context) flussConfig, FlinkConversions.toFlinkRowType(tableInfo.getRowType()), tableInfo.getSchema().getPrimaryKeyIndexes(), + tableInfo.getPartitionKeys(), true, null, - false); + tableInfo.getTableConfig().getDataLakeFormat().orElse(null), + false, + tableInfo.getNumBuckets(), + tableInfo.getBucketKeys(), + true); Sink sink = ((SinkV2Provider) diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 247e085542..21f9bfa93b 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -324,7 +324,7 @@ com.alibaba.fluss.flink.source.reader.RecordAndPos - com.alibaba.fluss.flink.sink.FlinkSink + com.alibaba.fluss.flink.sink.* com.alibaba.fluss.flink.metrics.* diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 3075dd9e36..bfaa1f999a 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -94,14 +94,14 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); | table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | | table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | | table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only 'paimon' is supported. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster | -| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. The supported merge engines are 'first_row' and 'versioned'. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | +| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. The supported merge engines are 'first_row' and 'versioned'. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | | table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the 'versioned' merge engine. If the merge engine is set to 'versioned', the version column must be set. | ## Read Options | Option | Type | Default | Description | |-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | +| scan.startup.mode | Enum | full | The scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following `scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See the [Start Reading Position](engine-flink/reads.md#start-reading-position) for more details. | | scan.startup.timestamp | Long | (None) | The timestamp to start reading the data from. This option is only valid when `scan.startup.mode` is set to `timestamp`. The format is 'milli-second-since-epoch' or 'yyyy-MM-dd HH:mm:ss', like '1678883047356' or '2023-12-09 23:09:12'. | | scan.partition.discovery.interval | Duration | 10s | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | @@ -116,38 +116,39 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); ## Lookup Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | -| lookup.cache | Enum | optional | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | -| lookup.max-retries | Integer | optional | 3 | The maximum allowed retries if a lookup operation fails. | -| lookup.partial-cache.expire-after-access | Duration | optional | (none) | Duration to expire an entry in the cache after accessing. | -| lookup.partial-cache.expire-after-write | Duration | optional | (none) | Duration to expire an entry in the cache after writing. | -| lookup.partial-cache.cache-missing-key | Boolean | optional | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | -| lookup.partial-cache.max-rows | Long | optional | true | The maximum number of rows to store in the cache. | -| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | -| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | -| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | -| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | +| Option | Type | Default | Description | +|------------------------------------------|------------|------------|-----------------------------------------------------------------------------------------------------------------------------| +| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | +| lookup.cache | Enum | optional | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | +| lookup.max-retries | Integer | optional | 3 | The maximum allowed retries if a lookup operation fails. | +| lookup.partial-cache.expire-after-access | Duration | optional | (none) | Duration to expire an entry in the cache after accessing. | +| lookup.partial-cache.expire-after-write | Duration | optional | (none) | Duration to expire an entry in the cache after writing. | +| lookup.partial-cache.cache-missing-key | Boolean | optional | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | +| lookup.partial-cache.max-rows | Long | optional | true | The maximum number of rows to store in the cache. | +| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | +| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | +| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | +| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | ## Write Options -| Option | Type | Default | Description | -|-----------------------------------------------------|------------|-------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | -| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | -| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | -| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | -| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | -| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | -| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | -| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | -| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | -| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | -| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | -| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | -| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | +| Option | Type | Default | Description | +|-----------------------------------------------------|------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. | +| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. | +| client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. | +| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). | +| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. | +| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. | +| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. | +| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. | +| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. | +| client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. | +| client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. | +| client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | +| client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | +| client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. | ## Other Options