Skip to content

Commit cb28ec4

Browse files
committed
[flink] Flink sink support hash by bucket key for PrimaryKey Table
1 parent aef53f2 commit cb28ec4

File tree

11 files changed

+146
-11
lines changed

11 files changed

+146
-11
lines changed

fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import javax.annotation.Nullable;
2222

23+
import java.io.Serializable;
24+
2325
/** An interface to assign a bucket according to the bucket key byte array. */
24-
public interface BucketingFunction {
26+
public interface BucketingFunction extends Serializable {
2527

2628
/**
2729
* Assign a bucket according to the bucket key byte array.

fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegment.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.DataInput;
2424
import java.io.DataOutput;
2525
import java.io.IOException;
26+
import java.io.Serializable;
2627
import java.nio.BufferOverflowException;
2728
import java.nio.BufferUnderflowException;
2829
import java.nio.ByteBuffer;
@@ -64,7 +65,7 @@
6465
* implementations on invocations of abstract methods.
6566
*/
6667
@Internal
67-
public final class MemorySegment {
68+
public final class MemorySegment implements Serializable {
6869

6970
/** The unsafe handle for transparent memory copied (heap / off-heap). */
7071
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

fluss-common/src/main/java/com/alibaba/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
* the positive integer is doubled. We assume that the probability of general integers being
6565
* positive is higher, so sacrifice the negative number to promote the positive number.
6666
*/
67-
public class CompactedRowWriter {
67+
public class CompactedRowWriter implements Serializable {
6868

6969
private final int headerSizeInBytes;
7070

fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@
2323

2424
import javax.annotation.Nullable;
2525

26+
import java.io.Serializable;
2627
import java.util.List;
2728

2829
/** An interface for encoding key of row into bytes. */
29-
public interface KeyEncoder {
30+
public interface KeyEncoder extends Serializable {
3031

3132
/** Encode the key of given row to byte array. */
3233
byte[] encodeKey(InternalRow row);

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ public class FlinkConnectorOptions {
106106
.defaultValue(false)
107107
.withDescription("Whether to ignore retract(-U/-D) record.");
108108

109+
public static final ConfigOption<Boolean> SINK_PRE_HASH_BY_BUCKET_KEY =
110+
ConfigOptions.key("sink.pre-hash-by-bucket-key")
111+
.booleanType()
112+
.defaultValue(false)
113+
.withDescription(
114+
"Whether to pre hash the record by bucket key before write to sink. Hash the data with the "
115+
+ "same bucket key to be processed by the same task can improve the efficiency"
116+
+ " of client processing and reduce resource consumption");
117+
109118
// --------------------------------------------------------------------------------------------
110119
// table storage specific options
111120
// --------------------------------------------------------------------------------------------

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Set;
5353

5454
import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
55+
import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
5556
import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlinkOption;
5657

5758
/** Factory to create table source and table sink for Fluss. */
@@ -151,7 +152,10 @@ public DynamicTableSink createDynamicTableSink(Context context) {
151152
context.getPrimaryKeyIndexes(),
152153
isStreamingMode,
153154
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
154-
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
155+
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
156+
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
157+
getBucketKeys(tableOptions),
158+
tableOptions.get(FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY));
155159
}
156160

157161
@Override
@@ -176,6 +180,7 @@ public Set<ConfigOption<?>> optionalOptions() {
176180
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
177181
FlinkConnectorOptions.LOOKUP_ASYNC,
178182
FlinkConnectorOptions.SINK_IGNORE_DELETE,
183+
FlinkConnectorOptions.SINK_PRE_HASH_BY_BUCKET_KEY,
179184
LookupOptions.MAX_RETRIES,
180185
LookupOptions.CACHE_TYPE,
181186
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
2323
import com.alibaba.fluss.connector.flink.sink.writer.UpsertSinkWriter;
2424
import com.alibaba.fluss.metadata.TablePath;
25+
import com.alibaba.fluss.row.encode.KeyEncoder;
2526

2627
import org.apache.flink.api.connector.sink2.Sink;
2728
import org.apache.flink.api.connector.sink2.SinkWriter;
2829
import org.apache.flink.api.connector.sink2.WriterInitContext;
2930
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
31+
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
3033
import org.apache.flink.table.data.RowData;
3134
import org.apache.flink.table.types.logical.RowType;
3235

@@ -35,8 +38,12 @@
3538
import java.io.IOException;
3639
import java.io.Serializable;
3740

38-
/** Flink sink for Fluss. */
39-
class FlinkSink implements Sink<RowData> {
41+
/**
42+
* Flink sink for Fluss.
43+
*
44+
* <p>TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20
45+
*/
46+
class FlinkSink implements Sink<RowData>, WithPreWriteTopology<RowData> {
4047

4148
private static final long serialVersionUID = 1L;
4249

@@ -60,9 +67,16 @@ public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOExce
6067
return flinkSinkWriter;
6168
}
6269

70+
@Override
71+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
72+
return builder.addPreWriteTopology(input);
73+
}
74+
6375
@Internal
6476
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
6577
W createWriter();
78+
79+
DataStream<RowData> addPreWriteTopology(DataStream<RowData> input);
6680
}
6781

6882
@Internal
@@ -90,6 +104,11 @@ public AppendSinkWriterBuilder(
90104
public AppendSinkWriter createWriter() {
91105
return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete);
92106
}
107+
108+
@Override
109+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
110+
return input;
111+
}
93112
}
94113

95114
@Internal
@@ -102,24 +121,42 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWrit
102121
private final RowType tableRowType;
103122
private final @Nullable int[] targetColumnIndexes;
104123
private final boolean ignoreDelete;
124+
private final int numBucket;
125+
private final KeyEncoder bucketKeyEncoder;
126+
private final boolean sinkReHash;
105127

106128
UpsertSinkWriterBuilder(
107129
TablePath tablePath,
108130
Configuration flussConfig,
109131
RowType tableRowType,
110132
@Nullable int[] targetColumnIndexes,
111-
boolean ignoreDelete) {
133+
boolean ignoreDelete,
134+
int numBucket,
135+
KeyEncoder bucketKeyEncoder,
136+
boolean sinkReHash) {
112137
this.tablePath = tablePath;
113138
this.flussConfig = flussConfig;
114139
this.tableRowType = tableRowType;
115140
this.targetColumnIndexes = targetColumnIndexes;
116141
this.ignoreDelete = ignoreDelete;
142+
this.numBucket = numBucket;
143+
this.bucketKeyEncoder = bucketKeyEncoder;
144+
this.sinkReHash = sinkReHash;
117145
}
118146

119147
@Override
120148
public UpsertSinkWriter createWriter() {
121149
return new UpsertSinkWriter(
122150
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
123151
}
152+
153+
@Override
154+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
155+
return sinkReHash
156+
? input.partitionCustom(
157+
(bucketId, numPartitions) -> bucketId % numPartitions,
158+
new RowDataKeySelector(bucketKeyEncoder, numBucket))
159+
: input;
160+
}
124161
}
125162
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.metadata.MergeEngineType;
2525
import com.alibaba.fluss.metadata.TablePath;
2626
import com.alibaba.fluss.row.GenericRow;
27+
import com.alibaba.fluss.row.encode.KeyEncoder;
2728

2829
import org.apache.flink.table.api.ValidationException;
2930
import org.apache.flink.table.catalog.Column;
@@ -52,6 +53,8 @@
5253
import java.util.Set;
5354
import java.util.stream.Collectors;
5455

56+
import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlussRowType;
57+
5558
/** A Flink {@link DynamicTableSink}. */
5659
public class FlinkTableSink
5760
implements DynamicTableSink,
@@ -67,6 +70,9 @@ public class FlinkTableSink
6770
private final boolean streaming;
6871
@Nullable private final MergeEngineType mergeEngineType;
6972
private final boolean ignoreDelete;
73+
private final int numBucket;
74+
private final List<String> bucketKeys;
75+
private final boolean sinkReHash;
7076

7177
private boolean appliedUpdates = false;
7278
@Nullable private GenericRow deleteRow;
@@ -78,14 +84,20 @@ public FlinkTableSink(
7884
int[] primaryKeyIndexes,
7985
boolean streaming,
8086
@Nullable MergeEngineType mergeEngineType,
81-
boolean ignoreDelete) {
87+
boolean ignoreDelete,
88+
int numBucket,
89+
List<String> bucketKeys,
90+
boolean sinkReHash) {
8291
this.tablePath = tablePath;
8392
this.flussConfig = flussConfig;
8493
this.tableRowType = tableRowType;
8594
this.primaryKeyIndexes = primaryKeyIndexes;
8695
this.streaming = streaming;
8796
this.mergeEngineType = mergeEngineType;
8897
this.ignoreDelete = ignoreDelete;
98+
this.numBucket = numBucket;
99+
this.bucketKeys = bucketKeys;
100+
this.sinkReHash = sinkReHash;
89101
}
90102

91103
@Override
@@ -168,7 +180,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
168180
flussConfig,
169181
tableRowType,
170182
targetColumnIndexes,
171-
ignoreDelete)
183+
ignoreDelete,
184+
numBucket,
185+
KeyEncoder.of(toFlussRowType(tableRowType), bucketKeys, null),
186+
sinkReHash)
172187
: new FlinkSink.AppendSinkWriterBuilder(
173188
tablePath, flussConfig, tableRowType, ignoreDelete);
174189

@@ -195,7 +210,10 @@ public DynamicTableSink copy() {
195210
primaryKeyIndexes,
196211
streaming,
197212
mergeEngineType,
198-
ignoreDelete);
213+
ignoreDelete,
214+
numBucket,
215+
bucketKeys,
216+
sinkReHash);
199217
sink.appliedUpdates = appliedUpdates;
200218
sink.deleteRow = deleteRow;
201219
return sink;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.flink.sink;
18+
19+
import com.alibaba.fluss.bucketing.FlussBucketingFunction;
20+
import com.alibaba.fluss.connector.flink.row.FlinkAsFlussRow;
21+
import com.alibaba.fluss.row.encode.KeyEncoder;
22+
23+
import org.apache.flink.api.java.functions.KeySelector;
24+
import org.apache.flink.table.data.RowData;
25+
26+
/** {@link KeySelector} to get bucket id. */
27+
public class RowDataKeySelector implements KeySelector<RowData, Integer> {
28+
29+
private final FlussBucketingFunction flussBucketingFunction;
30+
private final KeyEncoder bucketKeyEncoder;
31+
private final int numBucket;
32+
33+
public RowDataKeySelector(KeyEncoder bucketKeyEncoder, int numBucket) {
34+
this.bucketKeyEncoder = bucketKeyEncoder;
35+
this.flussBucketingFunction = new FlussBucketingFunction();
36+
this.numBucket = numBucket;
37+
}
38+
39+
@Override
40+
public Integer getKey(RowData rowData) throws Exception {
41+
FlinkAsFlussRow row = new FlinkAsFlussRow().replace(rowData);
42+
return flussBucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket);
43+
}
44+
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
import java.time.LocalDateTime;
2828
import java.time.ZoneId;
2929
import java.time.format.DateTimeFormatter;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
3033
import java.util.Optional;
34+
import java.util.stream.Collectors;
3135

3236
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
3337
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
@@ -63,6 +67,17 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions, Zone
6367
return options;
6468
}
6569

70+
public static List<String> getBucketKeys(ReadableConfig tableOptions) {
71+
Optional<String> bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY);
72+
if (!bucketKey.isPresent()) {
73+
// log tables don't have bucket key by default
74+
return new ArrayList<>();
75+
}
76+
77+
String[] keys = bucketKey.get().split(",");
78+
return Arrays.stream(keys).collect(Collectors.toList());
79+
}
80+
6681
public static int[] getBucketKeyIndexes(ReadableConfig tableOptions, RowType schema) {
6782
Optional<String> bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY);
6883
if (!bucketKey.isPresent()) {

0 commit comments

Comments
 (0)