Skip to content

Commit a1ccee5

Browse files
committed
[flink] Flink sink supports dynamic Fluss sink supports dynamic shuffle.
1 parent 515244d commit a1ccee5

File tree

12 files changed

+36
-93
lines changed

12 files changed

+36
-93
lines changed

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/FlinkCompatibilityUtil.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
21+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2122

2223
/**
2324
* An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the
@@ -31,4 +32,8 @@ public class RuntimeContextAdapter {
3132
public static int getAttemptNumber(RuntimeContext runtimeContext) {
3233
return runtimeContext.getAttemptNumber();
3334
}
35+
36+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
37+
return runtimeContext.getIndexOfThisSubtask();
38+
}
3439
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/FlinkCompatibilityUtil.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
21+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2122

2223
/**
2324
* An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the
@@ -31,4 +32,8 @@ public class RuntimeContextAdapter {
3132
public static int getAttemptNumber(RuntimeContext runtimeContext) {
3233
return runtimeContext.getTaskInfo().getAttemptNumber();
3334
}
35+
36+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
37+
return runtimeContext.getTaskInfo().getIndexOfThisSubtask();
38+
}
3439
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.flink.adapter.SinkAdapter;
2223
import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
2324
import org.apache.fluss.flink.sink.shuffle.DataStatisticsOperatorFactory;
2425
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
@@ -34,7 +35,6 @@
3435
import org.apache.flink.api.common.functions.FlatMapFunction;
3536
import org.apache.flink.api.common.operators.MailboxExecutor;
3637
import org.apache.flink.api.common.typeinfo.TypeInformation;
37-
import org.apache.flink.api.connector.sink2.Sink;
3838
import org.apache.flink.api.connector.sink2.SinkWriter;
3939
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
4040
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
@@ -52,7 +52,7 @@
5252
import static org.apache.fluss.flink.utils.FlinkConversions.toFlussRowType;
5353

5454
/** Flink sink for Fluss. */
55-
class FlinkSink<InputT> implements Sink<InputT>, SupportsPreWriteTopology<InputT> {
55+
class FlinkSink<InputT> extends SinkAdapter<InputT> implements SupportsPreWriteTopology<InputT> {
5656

5757
private static final long serialVersionUID = 1L;
5858

@@ -165,13 +165,13 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
165165
new StatisticsOrRecordTypeInformation<>(rowTypeInformation);
166166
SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream =
167167
input.transform(
168-
"Range shuffle Collector",
168+
"Dynamic shuffle data statistics",
169169
statisticsOrRecordTypeInformation,
170170
new DataStatisticsOperatorFactory<>(
171171
toFlussRowType(tableRowType),
172172
partitionKeys,
173173
flussSerializationSchema))
174-
.uid("Range shuffle Collector" + tablePath)
174+
.uid("Dynamic shuffle data statistics" + tablePath)
175175
// Set the parallelism same as input operator to encourage
176176
// chaining
177177
.setParallelism(input.getParallelism());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
199199
@Override
200200
public DataStreamSink<?> consumeDataStream(
201201
ProviderContext providerContext, DataStream<RowData> dataStream) {
202-
// todo: add prefix to the sink to avoid conflict in same flink job.
203-
String defaultSuffix =
202+
String sinkName =
204203
"sink"
205204
+ "-"
206205
+ tablePath.getTableName()
207206
+ "-"
208207
+ tablePath.getDatabaseName();
209-
return dataStream.sinkTo(flinkSink).uid(defaultSuffix).name(defaultSuffix);
208+
return dataStream.sinkTo(flinkSink).uid(sinkName).name(sinkName);
210209
}
211210
};
212211
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public class FlussSinkBuilder<InputT> {
7474
private String tableName;
7575
private final Map<String, String> configOptions = new HashMap<>();
7676
private FlussSerializationSchema<InputT> serializationSchema;
77-
private DistributionMode shuffleMode = DistributionMode.BUCKET_SHUFFLE;
77+
private DistributionMode distributionMode = DistributionMode.BUCKET_SHUFFLE;
7878
private TypeInformation<InputT> rowTypeInformation;
7979

8080
/** Set the bootstrap server for the sink. */
@@ -97,17 +97,17 @@ public FlussSinkBuilder<InputT> setTable(String table) {
9797

9898
/**
9999
* Set shuffle by bucket id. Deprecated use {@link
100-
* FlussSinkBuilder#setShuffleMode(DistributionMode) } instead.
100+
* FlussSinkBuilder#setDistributionMode(DistributionMode) } instead.
101101
*/
102102
@Deprecated
103103
public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId) {
104-
this.shuffleMode =
104+
this.distributionMode =
105105
shuffleByBucketId ? DistributionMode.BUCKET_SHUFFLE : DistributionMode.NONE;
106106
return this;
107107
}
108108

109-
public FlussSinkBuilder<InputT> setShuffleMode(DistributionMode shuffleByBucketId) {
110-
this.shuffleMode = shuffleByBucketId;
109+
public FlussSinkBuilder<InputT> setDistributionMode(DistributionMode distributionMode) {
110+
this.distributionMode = distributionMode;
111111
return this;
112112
}
113113

@@ -145,9 +145,7 @@ public FlussSink<InputT> build() {
145145
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> writerBuilder;
146146

147147
TablePath tablePath = new TablePath(database, tableName);
148-
if (bootstrapServers != null) {
149-
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
150-
}
148+
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
151149

152150
TableInfo tableInfo;
153151
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
@@ -184,7 +182,7 @@ public FlussSink<InputT> build() {
184182
bucketKeys,
185183
partitionKeys,
186184
lakeFormat,
187-
shuffleMode,
185+
distributionMode,
188186
serializationSchema);
189187
} else {
190188
LOG.info("Initializing Fluss append sink writer ...");
@@ -197,7 +195,7 @@ public FlussSink<InputT> build() {
197195
bucketKeys,
198196
partitionKeys,
199197
lakeFormat,
200-
shuffleMode,
198+
distributionMode,
201199
serializationSchema,
202200
rowTypeInformation);
203201
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ public long size(RowData value, RowType rowType) {
210210
size += 2;
211211
break;
212212
case INTEGER:
213+
case FLOAT:
213214
case DATE:
214215
case TIME_WITHOUT_TIME_ZONE:
215216
size += 4;
216217
break;
217218
case BIGINT:
218-
case FLOAT:
219219
case DOUBLE:
220220
case TIMESTAMP_WITHOUT_TIME_ZONE:
221221
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
import java.util.List;
4242

43-
import static org.apache.fluss.flink.adapter.FlinkCompatibilityUtil.getIndexOfThisSubtask;
43+
import static org.apache.fluss.flink.adapter.RuntimeContextAdapter.getIndexOfThisSubtask;
4444
import static org.apache.fluss.utils.Preconditions.checkArgument;
4545

4646
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ public void setup(int numChannels) {
108108

109109
@Override
110110
public int channel(StatisticsOrRecord<InputT> wrapper) {
111+
if (wrapper == null) {
112+
throw new FlussRuntimeException("StatisticsOrRecord wrapper must not be null");
113+
}
114+
111115
try {
112116
if (wrapper.hasStatistics()) {
113117
this.delegatePartitioner = delegatePartitioner(wrapper.statistics());

0 commit comments

Comments
 (0)