Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to ignore retract(-U/-D) record.");

public static final ConfigOption<Boolean> SINK_BUCKET_SHUFFLE =
ConfigOptions.key("sink.bucket-shuffle")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to shuffle by bucket id before write to sink. Shuffling the data with the same "
+ "bucket id to be processed by the same task can improve the efficiency of client "
+ "processing and reduce resource consumption. For Log Table, bucket shuffle will "
+ "only take effect when the '"
+ BUCKET_KEY.key()
+ "' is defined. For Primary Key table, it is enabled by default.");

// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption;

/** Factory to create table source and table sink for Fluss. */
Expand Down Expand Up @@ -94,8 +97,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
resolvedCatalogTable.getPartitionKeys().stream()
.mapToInt(tableOutputType::getFieldIndex)
.toArray();
int[] bucketKeyIndexes =
FlinkConnectorOptionsUtils.getBucketKeyIndexes(tableOptions, tableOutputType);
int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, tableOutputType);

// options for lookup
LookupCache cache = null;
Expand Down Expand Up @@ -141,6 +143,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
List<String> partitionKeys = resolvedCatalogTable.getPartitionKeys();

RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
final ReadableConfig tableOptions = helper.getOptions();

Expand All @@ -149,9 +154,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
partitionKeys,
isStreamingMode,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_FORMAT)),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
getBucketKeys(tableOptions),
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
}

@Override
Expand All @@ -176,6 +186,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_IGNORE_DELETE,
FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.flink.sink;

import java.io.Serializable;

/**
* A utility class to compute which downstream channel a given record should be sent to before flink
* sink.
*
* @param <T> type of record
*/
public interface ChannelComputer<T> extends Serializable {
void setup(int numChannels);

int channel(T record);

static int select(String partitionName, int bucket, int numChannels) {
int startChannel = Math.abs(partitionName.hashCode()) % numChannels;
return (startChannel + bucket) % numChannels;
}

static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.flink.sink;

import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.bucketing.BucketingFunction;
import com.alibaba.fluss.client.table.getter.PartitionGetter;
import com.alibaba.fluss.flink.row.FlinkAsFlussRow;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.row.encode.KeyEncoder;
import com.alibaba.fluss.types.RowType;

import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

import java.util.List;

import static com.alibaba.fluss.utils.Preconditions.checkNotNull;

/** {@link ChannelComputer} for flink {@link RowData}. */
public class FlinkRowDataChannelComputer implements ChannelComputer<RowData> {

private static final long serialVersionUID = 1L;

private final @Nullable DataLakeFormat lakeFormat;
private final int numBucket;
private final RowType flussRowType;
private final List<String> bucketKeys;
private final List<String> partitionKeys;

private transient int numChannels;
private transient BucketingFunction bucketingFunction;
private transient KeyEncoder bucketKeyEncoder;
private transient boolean combineShuffleWithPartitionName;
private transient @Nullable PartitionGetter partitionGetter;

public FlinkRowDataChannelComputer(
RowType flussRowType,
List<String> bucketKeys,
List<String> partitionKeys,
@Nullable DataLakeFormat lakeFormat,
int numBucket) {
this.flussRowType = flussRowType;
this.bucketKeys = bucketKeys;
this.partitionKeys = partitionKeys;
this.lakeFormat = lakeFormat;
this.numBucket = numBucket;
}

@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
this.bucketingFunction = BucketingFunction.of(lakeFormat);
this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat);
if (partitionKeys.isEmpty()) {
this.partitionGetter = null;
} else {
this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys);
}

// Only when partition keys exist and the Flink job parallelism and the bucket number are
// not divisible, then we need to include the partition name as part of the shuffle key.
// This approach can help avoid the possible data skew. For example, if bucket number is 3
// and task parallelism is 2, it is highly possible that data shuffle becomes uneven. For
// instance, in task1, it might have 'partition0-bucket0', 'partition1-bucket0',
// 'partition0-bucket2', and 'partition1-bucket2', whereas in task2, it would only have
// 'partition0-bucket1' and 'partition1-bucket1'. As partition number increases, this
// situation becomes even more severe.
this.combineShuffleWithPartitionName =
partitionGetter != null && numBucket % numChannels != 0;
}

@Override
public int channel(RowData record) {
FlinkAsFlussRow row = new FlinkAsFlussRow().replace(record);
int bucketId = bucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket);
if (!combineShuffleWithPartitionName) {
return ChannelComputer.select(bucketId, numChannels);
} else {
checkNotNull(partitionGetter, "partitionGetter is null");
String partitionName = partitionGetter.getPartition(row);
return ChannelComputer.select(partitionName, bucketId, numChannels);
}
}

@Override
public String toString() {
return "BUCKET_SHUFFLE";
}

@VisibleForTesting
boolean isCombineShuffleWithPartitionName() {
return combineShuffleWithPartitionName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,34 @@
import com.alibaba.fluss.flink.sink.writer.AppendSinkWriter;
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
import com.alibaba.fluss.flink.sink.writer.UpsertSinkWriter;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

/** Flink sink for Fluss. */
class FlinkSink implements Sink<RowData> {
import static com.alibaba.fluss.flink.sink.FlinkStreamPartitioner.partition;
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType;

/**
* Flink sink for Fluss.
*
* <p>TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20. Trace
* by https://github.com/alibaba/fluss/issues/622.
*/
class FlinkSink implements Sink<RowData>, WithPreWriteTopology<RowData> {

private static final long serialVersionUID = 1L;

Expand All @@ -61,9 +73,16 @@ public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOExce
return flinkSinkWriter;
}

@Override
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
return builder.addPreWriteTopology(input);
}

@Internal
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
W createWriter();

DataStream<RowData> addPreWriteTopology(DataStream<RowData> input);
}

@Internal
Expand All @@ -75,22 +94,55 @@ static class AppendSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWrit
private final Configuration flussConfig;
private final RowType tableRowType;
private final boolean ignoreDelete;
private final int numBucket;
private final List<String> bucketKeys;
private final List<String> partitionKeys;
private final @Nullable DataLakeFormat lakeFormat;
private final boolean shuffleByBucketId;

public AppendSinkWriterBuilder(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
boolean ignoreDelete) {
boolean ignoreDelete,
int numBucket,
List<String> bucketKeys,
List<String> partitionKeys,
@Nullable DataLakeFormat lakeFormat,
boolean shuffleByBucketId) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.ignoreDelete = ignoreDelete;
this.numBucket = numBucket;
this.bucketKeys = bucketKeys;
this.partitionKeys = partitionKeys;
this.lakeFormat = lakeFormat;
this.shuffleByBucketId = shuffleByBucketId;
}

@Override
public AppendSinkWriter createWriter() {
return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete);
}

@Override
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
// For append only sink, we will do bucket shuffle only if bucket keys are not empty.
if (!bucketKeys.isEmpty() && shuffleByBucketId) {
return partition(
input,
new FlinkRowDataChannelComputer(
toFlussRowType(tableRowType),
bucketKeys,
partitionKeys,
lakeFormat,
numBucket),
input.getParallelism());
} else {
return input;
}
}
}

@Internal
Expand All @@ -103,24 +155,54 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWrit
private final RowType tableRowType;
private final @Nullable int[] targetColumnIndexes;
private final boolean ignoreDelete;
private final int numBucket;
private final List<String> bucketKeys;
private final List<String> partitionKeys;
private final @Nullable DataLakeFormat lakeFormat;
private final boolean shuffleByBucketId;

UpsertSinkWriterBuilder(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes,
boolean ignoreDelete) {
boolean ignoreDelete,
int numBucket,
List<String> bucketKeys,
List<String> partitionKeys,
@Nullable DataLakeFormat lakeFormat,
boolean shuffleByBucketId) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.targetColumnIndexes = targetColumnIndexes;
this.ignoreDelete = ignoreDelete;
this.numBucket = numBucket;
this.bucketKeys = bucketKeys;
this.partitionKeys = partitionKeys;
this.lakeFormat = lakeFormat;
this.shuffleByBucketId = shuffleByBucketId;
}

@Override
public UpsertSinkWriter createWriter() {
return new UpsertSinkWriter(
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
}

@Override
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
return shuffleByBucketId
? partition(
input,
new FlinkRowDataChannelComputer(
toFlussRowType(tableRowType),
bucketKeys,
partitionKeys,
lakeFormat,
numBucket),
input.getParallelism())
: input;
}
}
}
Loading