Skip to content

Commit 24af450

Browse files
committed
[flink] Flink sink supports dynamic Fluss sink supports dynamic shuffle.
1 parent 34b7688 commit 24af450

34 files changed

+3276
-52
lines changed

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
21+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2122

2223
/**
2324
* An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the
@@ -31,4 +32,8 @@ public class RuntimeContextAdapter {
3132
public static int getAttemptNumber(RuntimeContext runtimeContext) {
3233
return runtimeContext.getAttemptNumber();
3334
}
35+
36+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
37+
return runtimeContext.getIndexOfThisSubtask();
38+
}
3439
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink;
1919

2020
import org.apache.fluss.config.FlussConfigUtils;
21+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
2122
import org.apache.fluss.flink.utils.FlinkConversions;
2223

2324
import org.apache.flink.configuration.ConfigOption;
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
125126
+ BUCKET_KEY.key()
126127
+ "' is defined. For Primary Key table, it is enabled by default.");
127128

129+
public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE =
130+
ConfigOptions.key("sink.distribution-mode")
131+
.enumType(DistributionMode.class)
132+
.defaultValue(DistributionMode.BUCKET_SHUFFLE)
133+
.withDescription(
134+
"Defines the distribution mode for writing data to the sink. Available options are: \n"
135+
+ "- NONE: No specific distribution strategy. Data is forwarded as is.\n"
136+
+ "- BUCKET_SHUFFLE: Shuffle data by bucket ID before writing to sink. "
137+
+ "Shuffling the data with the same bucket ID to be processed by the same task "
138+
+ "can improve the efficiency of client processing and reduce resource consumption. "
139+
+ "For Log Table, bucket shuffle will only take effect when the '"
140+
+ BUCKET_KEY.key()
141+
+ "' is defined. For Primary Key table, it is enabled by default.\n"
142+
+ "- DYNAMIC_SHUFFLE: Dynamically adjust shuffle strategy based on partition key traffic patterns. "
143+
+ "This mode monitors data distribution and adjusts the shuffle behavior to balance the load. "
144+
+ "It is only supported for partitioned tables.");
145+
128146
// --------------------------------------------------------------------------------------------
129147
// table storage specific options
130148
// --------------------------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
21+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2122

2223
/**
2324
* An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the
@@ -31,4 +32,8 @@ public class RuntimeContextAdapter {
3132
public static int getAttemptNumber(RuntimeContext runtimeContext) {
3233
return runtimeContext.getTaskInfo().getAttemptNumber();
3334
}
35+
36+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
37+
return runtimeContext.getTaskInfo().getIndexOfThisSubtask();
38+
}
3439
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2424
import org.apache.fluss.flink.lake.LakeTableFactory;
2525
import org.apache.fluss.flink.sink.FlinkTableSink;
26+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
2627
import org.apache.fluss.flink.source.FlinkTableSource;
2728
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2829
import org.apache.fluss.metadata.DataLakeFormat;
@@ -173,6 +174,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
173174
List<String> partitionKeys = resolvedCatalogTable.getPartitionKeys();
174175

175176
RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
177+
DistributionMode distributionMode;
178+
if (tableOptions.getOptional(FlinkConnectorOptions.SINK_DISTRIBUTION_MODE).isPresent()) {
179+
distributionMode = tableOptions.get(FlinkConnectorOptions.SINK_DISTRIBUTION_MODE);
180+
} else {
181+
distributionMode =
182+
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE)
183+
? DistributionMode.BUCKET_SHUFFLE
184+
: DistributionMode.NONE;
185+
}
176186

177187
return new FlinkTableSink(
178188
toFlussTablePath(context.getObjectIdentifier()),
@@ -188,7 +198,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
188198
tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
189199
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
190200
getBucketKeys(tableOptions),
191-
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
201+
distributionMode);
192202
}
193203

194204
@Override
@@ -214,6 +224,7 @@ public Set<ConfigOption<?>> optionalOptions() {
214224
FlinkConnectorOptions.LOOKUP_ASYNC,
215225
FlinkConnectorOptions.SINK_IGNORE_DELETE,
216226
FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
227+
FlinkConnectorOptions.SINK_DISTRIBUTION_MODE,
217228
LookupOptions.MAX_RETRIES,
218229
LookupOptions.CACHE_TYPE,
219230
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,26 @@
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.adapter.SinkAdapter;
2323
import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
24+
import org.apache.fluss.flink.sink.shuffle.DataStatisticsOperatorFactory;
25+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
26+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecord;
27+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordChannelComputer;
28+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordTypeInformation;
2429
import org.apache.fluss.flink.sink.writer.AppendSinkWriter;
2530
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
2631
import org.apache.fluss.flink.sink.writer.UpsertSinkWriter;
2732
import org.apache.fluss.metadata.DataLakeFormat;
2833
import org.apache.fluss.metadata.TablePath;
2934

35+
import org.apache.flink.api.common.functions.FlatMapFunction;
3036
import org.apache.flink.api.common.operators.MailboxExecutor;
37+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3138
import org.apache.flink.api.connector.sink2.SinkWriter;
3239
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
3340
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
3441
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
3542
import org.apache.flink.streaming.api.datastream.DataStream;
43+
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
3644
import org.apache.flink.table.types.logical.RowType;
3745

3846
import javax.annotation.Nullable;
@@ -87,8 +95,9 @@ static class AppendSinkWriterBuilder<InputT>
8795
private final List<String> bucketKeys;
8896
private final List<String> partitionKeys;
8997
private final @Nullable DataLakeFormat lakeFormat;
90-
private final boolean shuffleByBucketId;
98+
private final DistributionMode shuffleMode;
9199
private final FlussSerializationSchema<InputT> flussSerializationSchema;
100+
private final @Nullable TypeInformation<InputT> rowTypeInformation;
92101

93102
public AppendSinkWriterBuilder(
94103
TablePath tablePath,
@@ -98,17 +107,19 @@ public AppendSinkWriterBuilder(
98107
List<String> bucketKeys,
99108
List<String> partitionKeys,
100109
@Nullable DataLakeFormat lakeFormat,
101-
boolean shuffleByBucketId,
102-
FlussSerializationSchema<InputT> flussSerializationSchema) {
110+
DistributionMode shuffleMode,
111+
FlussSerializationSchema<InputT> flussSerializationSchema,
112+
@Nullable TypeInformation<InputT> rowTypeInformation) {
103113
this.tablePath = tablePath;
104114
this.flussConfig = flussConfig;
105115
this.tableRowType = tableRowType;
106116
this.numBucket = numBucket;
107117
this.bucketKeys = bucketKeys;
108118
this.partitionKeys = partitionKeys;
109119
this.lakeFormat = lakeFormat;
110-
this.shuffleByBucketId = shuffleByBucketId;
120+
this.shuffleMode = shuffleMode;
111121
this.flussSerializationSchema = flussSerializationSchema;
122+
this.rowTypeInformation = rowTypeInformation;
112123
}
113124

114125
@Override
@@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
123134

124135
@Override
125136
public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
126-
// For append only sink, we will do bucket shuffle only if bucket keys are not empty.
127-
if (!bucketKeys.isEmpty() && shuffleByBucketId) {
128-
return partition(
129-
input,
130-
new FlinkRowDataChannelComputer<>(
131-
toFlussRowType(tableRowType),
132-
bucketKeys,
133-
partitionKeys,
134-
lakeFormat,
135-
numBucket,
136-
flussSerializationSchema),
137-
input.getParallelism());
138-
} else {
139-
return input;
137+
switch (shuffleMode) {
138+
case BUCKET_SHUFFLE:
139+
if (!bucketKeys.isEmpty()) {
140+
return partition(
141+
input,
142+
new FlinkRowDataChannelComputer<>(
143+
toFlussRowType(tableRowType),
144+
bucketKeys,
145+
partitionKeys,
146+
lakeFormat,
147+
numBucket,
148+
flussSerializationSchema),
149+
input.getParallelism());
150+
}
151+
return input;
152+
case NONE:
153+
return input;
154+
case DYNAMIC_SHUFFLE:
155+
if (partitionKeys.isEmpty()) {
156+
throw new UnsupportedOperationException(
157+
"DYNAMIC_SHUFFLE is only supported for partition tables");
158+
}
159+
160+
if (rowTypeInformation == null) {
161+
throw new UnsupportedOperationException(
162+
"RowTypeInformation is required for DYNAMIC_SHUFFLE mode.");
163+
}
164+
TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation =
165+
new StatisticsOrRecordTypeInformation<>(rowTypeInformation);
166+
SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream =
167+
input.transform(
168+
"Dynamic shuffle data statistics",
169+
statisticsOrRecordTypeInformation,
170+
new DataStatisticsOperatorFactory<>(
171+
toFlussRowType(tableRowType),
172+
partitionKeys,
173+
flussSerializationSchema))
174+
.uid("Dynamic shuffle data statistics" + tablePath)
175+
// Set the parallelism same as input operator to encourage
176+
// chaining
177+
.setParallelism(input.getParallelism());
178+
179+
return partition(
180+
shuffleStream,
181+
new StatisticsOrRecordChannelComputer<>(
182+
toFlussRowType(tableRowType),
183+
bucketKeys,
184+
partitionKeys,
185+
numBucket,
186+
lakeFormat,
187+
flussSerializationSchema),
188+
input.getParallelism())
189+
.flatMap(
190+
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
191+
(statisticsOrRecord, out) -> {
192+
if (statisticsOrRecord.hasRecord()) {
193+
out.collect(statisticsOrRecord.record());
194+
}
195+
})
196+
.uid("flat map" + tablePath)
197+
// To promote operator chaining with the downstream writer operator,
198+
// setting slot sharing group and the parallelism as default, {@link
199+
// SinkTransformationTranslator} will set the parallelism same as sink
200+
// transformation.
201+
.slotSharingGroup("shuffle-partition-custom-group")
202+
.returns(rowTypeInformation);
203+
204+
default:
205+
throw new UnsupportedOperationException(
206+
"Unsupported distribution mode: " + shuffleMode);
140207
}
141208
}
142209
}
@@ -155,7 +222,7 @@ static class UpsertSinkWriterBuilder<InputT>
155222
private final List<String> bucketKeys;
156223
private final List<String> partitionKeys;
157224
private final @Nullable DataLakeFormat lakeFormat;
158-
private final boolean shuffleByBucketId;
225+
private final DistributionMode shuffleMode;
159226
private final FlussSerializationSchema<InputT> flussSerializationSchema;
160227

161228
UpsertSinkWriterBuilder(
@@ -167,7 +234,7 @@ static class UpsertSinkWriterBuilder<InputT>
167234
List<String> bucketKeys,
168235
List<String> partitionKeys,
169236
@Nullable DataLakeFormat lakeFormat,
170-
boolean shuffleByBucketId,
237+
DistributionMode shuffleMode,
171238
FlussSerializationSchema<InputT> flussSerializationSchema) {
172239
this.tablePath = tablePath;
173240
this.flussConfig = flussConfig;
@@ -177,7 +244,7 @@ static class UpsertSinkWriterBuilder<InputT>
177244
this.bucketKeys = bucketKeys;
178245
this.partitionKeys = partitionKeys;
179246
this.lakeFormat = lakeFormat;
180-
this.shuffleByBucketId = shuffleByBucketId;
247+
this.shuffleMode = shuffleMode;
181248
this.flussSerializationSchema = flussSerializationSchema;
182249
}
183250

@@ -194,8 +261,9 @@ public UpsertSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
194261

195262
@Override
196263
public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
197-
return shuffleByBucketId
198-
? partition(
264+
switch (shuffleMode) {
265+
case BUCKET_SHUFFLE:
266+
return partition(
199267
input,
200268
new FlinkRowDataChannelComputer<>(
201269
toFlussRowType(tableRowType),
@@ -204,8 +272,13 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
204272
lakeFormat,
205273
numBucket,
206274
flussSerializationSchema),
207-
input.getParallelism())
208-
: input;
275+
input.getParallelism());
276+
case NONE:
277+
return input;
278+
default:
279+
throw new UnsupportedOperationException(
280+
"Unsupported distribution mode: " + shuffleMode);
281+
}
209282
}
210283
}
211284
}

0 commit comments

Comments
 (0)