Skip to content

Commit af3b367

Browse files
committed
[flink] Implement SupportsWatermarkPushDown for FlinkTableSource
1 parent c49b848 commit af3b367

File tree

4 files changed

+237
-2
lines changed

4 files changed

+237
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@
4343
import org.apache.fluss.types.RowType;
4444

4545
import org.apache.flink.annotation.VisibleForTesting;
46+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
4647
import org.apache.flink.api.common.typeinfo.TypeInformation;
48+
import org.apache.flink.api.connector.source.Boundedness;
4749
import org.apache.flink.api.connector.source.Source;
4850
import org.apache.flink.streaming.api.datastream.DataStream;
51+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4952
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5053
import org.apache.flink.table.connector.ChangelogMode;
5154
import org.apache.flink.table.connector.ProviderContext;
@@ -60,6 +63,7 @@
6063
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
6164
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
6265
import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
66+
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
6367
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
6468
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
6569
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
@@ -105,10 +109,13 @@ public class FlinkTableSource
105109
LookupTableSource,
106110
SupportsRowLevelModificationScan,
107111
SupportsLimitPushDown,
108-
SupportsAggregatePushDown {
112+
SupportsAggregatePushDown,
113+
SupportsWatermarkPushDown {
109114

110115
public static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);
111116

117+
private static final String FLUSS_TRANSFORMATION = "fluss";
118+
112119
private final TablePath tablePath;
113120
private final Configuration flussConfig;
114121
// output type before projection pushdown
@@ -155,6 +162,9 @@ public class FlinkTableSource
155162

156163
@Nullable private LakeSource<LakeSplit> lakeSource;
157164

165+
/** Watermark strategy that is pushed down by the Flink optimizer. */
166+
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
167+
158168
public FlinkTableSource(
159169
TablePath tablePath,
160170
Configuration flussConfig,
@@ -373,7 +383,25 @@ public boolean isBounded() {
373383
}
374384
};
375385
} else {
376-
return SourceProvider.of(source);
386+
return new DataStreamScanProvider() {
387+
@Override
388+
public DataStream<RowData> produceDataStream(
389+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
390+
WatermarkStrategy<RowData> strategy =
391+
watermarkStrategy != null
392+
? watermarkStrategy
393+
: WatermarkStrategy.noWatermarks();
394+
DataStreamSource<RowData> sourceStream =
395+
execEnv.fromSource(source, strategy, "FlussSource-" + tablePath);
396+
providerContext.generateUid(FLUSS_TRANSFORMATION).ifPresent(sourceStream::uid);
397+
return sourceStream;
398+
}
399+
400+
@Override
401+
public boolean isBounded() {
402+
return source.getBoundedness() == Boundedness.BOUNDED;
403+
}
404+
};
377405
}
378406
}
379407

@@ -444,6 +472,7 @@ public DynamicTableSource copy() {
444472
source.modificationScanType = modificationScanType;
445473
source.partitionFilters = partitionFilters;
446474
source.lakeSource = lakeSource;
475+
source.watermarkStrategy = watermarkStrategy;
447476
return source;
448477
}
449478

@@ -466,6 +495,11 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
466495
}
467496
}
468497

498+
@Override
499+
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
500+
this.watermarkStrategy = watermarkStrategy;
501+
}
502+
469503
@Override
470504
public Result applyFilters(List<ResolvedExpression> filters) {
471505

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,6 +1150,52 @@ void testStreamingReadSinglePartitionPushDown() throws Exception {
11501150
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
11511151
}
11521152

1153+
@Test
1154+
void testStreamingReadPartitionPushDownWithWatermark() throws Exception {
1155+
tEnv.executeSql(
1156+
"create table watermark_partitioned_table"
1157+
+ " (a int not null, b varchar, ts timestamp(3),"
1158+
+ " c string,"
1159+
+ " primary key (a, c) NOT ENFORCED,"
1160+
+ " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)"
1161+
+ " partitioned by (c) ");
1162+
TablePath tablePath = TablePath.of(DEFAULT_DB, "watermark_partitioned_table");
1163+
tEnv.executeSql("alter table watermark_partitioned_table add partition (c=2025)");
1164+
tEnv.executeSql("alter table watermark_partitioned_table add partition (c=2026)");
1165+
1166+
// write data with 4 columns (a, b, ts, c), ts is nullable
1167+
List<InternalRow> rows = new ArrayList<>();
1168+
List<String> expectedRowValues = new ArrayList<>();
1169+
for (String partition : Arrays.asList("2025", "2026")) {
1170+
for (int i = 0; i < 10; i++) {
1171+
rows.add(row(i, "v1", null, partition));
1172+
if (partition.equals("2025")) {
1173+
expectedRowValues.add(String.format("+I[%d, v1, %s]", i, partition));
1174+
}
1175+
}
1176+
}
1177+
writeRows(conn, tablePath, rows, false);
1178+
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
1179+
1180+
// verify partition filter is pushed down in the execution plan
1181+
String plan =
1182+
tEnv.explainSql("select a, b, c from watermark_partitioned_table where c ='2025'");
1183+
assertThat(plan)
1184+
.contains(
1185+
"TableSourceScan(table=[[testcatalog, defaultdb, watermark_partitioned_table, "
1186+
+ "watermark=[-(ts, 5000:INTERVAL SECOND)], "
1187+
+ "watermarkEmitStrategy=[on-periodic], "
1188+
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
1189+
+ "fields=[a, b, ts, c])");
1190+
1191+
// verify query results only contain data from the matching partition
1192+
org.apache.flink.util.CloseableIterator<Row> rowIter =
1193+
tEnv.executeSql("select a, b, c from watermark_partitioned_table where c ='2025'")
1194+
.collect();
1195+
1196+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1197+
}
1198+
11531199
@Test
11541200
void testStreamingReadAllPartitionTypePushDown() throws Exception {
11551201
tEnv.executeSql(

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceITCase.java

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,25 @@
3333
import org.apache.fluss.metadata.TablePath;
3434
import org.apache.fluss.row.GenericRow;
3535
import org.apache.fluss.row.InternalRow;
36+
import org.apache.fluss.types.DataTypes;
3637
import org.apache.fluss.types.RowType;
3738

39+
import org.apache.flink.api.common.eventtime.Watermark;
40+
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
41+
import org.apache.flink.api.common.eventtime.WatermarkOutput;
3842
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
43+
import org.apache.flink.api.common.functions.MapFunction;
44+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3945
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4046
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
47+
import org.apache.flink.streaming.api.functions.ProcessFunction;
48+
import org.apache.flink.streaming.api.operators.StreamMap;
49+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4150
import org.apache.flink.table.data.GenericRowData;
4251
import org.apache.flink.table.data.RowData;
4352
import org.apache.flink.table.data.StringData;
4453
import org.apache.flink.types.RowKind;
54+
import org.apache.flink.util.Collector;
4555
import org.junit.jupiter.api.BeforeEach;
4656
import org.junit.jupiter.api.Test;
4757

@@ -266,6 +276,149 @@ public void testTableLogSourceWithProjectionPushdown() throws Exception {
266276
assertThat(collectedElements).hasSameElementsAs(expectedOutput);
267277
}
268278

279+
/** Verifies that event-time timestamps are correctly assigned via WatermarkStrategy. */
280+
@Test
281+
void testTimestamp() throws Exception {
282+
// 1. Create Fluss log table
283+
String tableName = "wm_timestamp_test";
284+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
285+
Schema schema =
286+
Schema.newBuilder()
287+
.column("id", DataTypes.INT())
288+
.column("name", DataTypes.STRING())
289+
.column("event_time", DataTypes.BIGINT())
290+
.build();
291+
createTable(tablePath, TableDescriptor.builder().schema(schema).distributedBy(1).build());
292+
293+
// 2. Write 3 records with known event_time values
294+
final long currentTimestamp = System.currentTimeMillis();
295+
List<InternalRow> rows =
296+
Arrays.asList(
297+
row(1, "name1", currentTimestamp + 1L),
298+
row(2, "name2", currentTimestamp + 2L),
299+
row(3, "name3", currentTimestamp + 3L));
300+
writeRows(conn, tablePath, rows, true);
301+
302+
// 3. Build FlussSource and apply WatermarkStrategy with TimestampAssigner
303+
FlussSource<RowData> source =
304+
FlussSource.<RowData>builder()
305+
.setBootstrapServers(bootstrapServers)
306+
.setDatabase(DEFAULT_DB)
307+
.setTable(tableName)
308+
.setStartingOffsets(OffsetsInitializer.earliest())
309+
.setDeserializationSchema(new RowDataDeserializationSchema())
310+
.build();
311+
312+
env.setParallelism(1);
313+
DataStreamSource<RowData> stream =
314+
env.fromSource(
315+
source,
316+
WatermarkStrategy.<RowData>noWatermarks()
317+
.withTimestampAssigner(
318+
(rowData, ts) -> rowData.getLong(2)), // event_time column
319+
"testTimestamp");
320+
321+
// Verify that the timestamp and watermark are working fine.
322+
List<Long> result =
323+
stream.transform(
324+
"timestampVerifier",
325+
TypeInformation.of(Long.class),
326+
new WatermarkVerifyingOperator(v -> v.getLong(2)))
327+
.executeAndCollect(3);
328+
assertThat(result)
329+
.containsExactlyInAnyOrder(
330+
currentTimestamp + 1L, currentTimestamp + 2L, currentTimestamp + 3L);
331+
}
332+
333+
/** Verifies per-bucket (per-split) watermark multiplexing correctness. */
334+
@Test
335+
void testPerBucketWatermark() throws Exception {
336+
// 1. Create 2-bucket Fluss log table
337+
String tableName = "wm_per_bucket_test";
338+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
339+
Schema schema =
340+
Schema.newBuilder()
341+
.column("id", DataTypes.INT())
342+
.column("name", DataTypes.STRING())
343+
.column("ts", DataTypes.BIGINT())
344+
.build();
345+
createTable(tablePath, TableDescriptor.builder().schema(schema).distributedBy(2).build());
346+
347+
// 2. Write 6 records with interleaved timestamps
348+
List<InternalRow> rows =
349+
Arrays.asList(
350+
row(1, "a", 100L),
351+
row(2, "b", 150L),
352+
row(3, "c", 200L),
353+
row(4, "d", 250L),
354+
row(5, "e", 300L),
355+
row(6, "f", 350L));
356+
writeRows(conn, tablePath, rows, true);
357+
358+
// 3. Build FlussSource and apply per-split WatermarkStrategy
359+
FlussSource<RowData> source =
360+
FlussSource.<RowData>builder()
361+
.setBootstrapServers(bootstrapServers)
362+
.setDatabase(DEFAULT_DB)
363+
.setTable(tableName)
364+
.setStartingOffsets(OffsetsInitializer.earliest())
365+
.setDeserializationSchema(new RowDataDeserializationSchema())
366+
.build();
367+
368+
env.setParallelism(1);
369+
370+
// 4. Assert per-split watermark ordering via ProcessFunction
371+
env.fromSource(
372+
source,
373+
WatermarkStrategy.forGenerator(ctx -> new OnEventWatermarkGenerator())
374+
.withTimestampAssigner(
375+
(rowData, ts) -> rowData.getLong(2)), // ts column
376+
"testPerPartitionWatermark")
377+
.process(
378+
new ProcessFunction<RowData, Object>() {
379+
@Override
380+
public void processElement(
381+
RowData value,
382+
ProcessFunction<RowData, Object>.Context ctx,
383+
Collector<Object> out) {
384+
assertThat(ctx.timestamp())
385+
.as(
386+
"Event time should never behind watermark "
387+
+ "because of per-split watermark multiplexing logic")
388+
.isGreaterThanOrEqualTo(
389+
ctx.timerService().currentWatermark());
390+
out.collect(ctx.timestamp());
391+
}
392+
})
393+
.executeAndCollect(6);
394+
}
395+
396+
/** A StreamMap that verifies the watermark logic. */
397+
private static class WatermarkVerifyingOperator extends StreamMap<RowData, Long> {
398+
399+
private static final long serialVersionUID = 1L;
400+
401+
public WatermarkVerifyingOperator(MapFunction<RowData, Long> mapper) {
402+
super(mapper);
403+
}
404+
405+
@Override
406+
public void processElement(StreamRecord<RowData> element) {
407+
output.collect(new StreamRecord<>(element.getTimestamp()));
408+
}
409+
}
410+
411+
/** A WatermarkGenerator that emits a watermark equal to the event timestamp on each event. */
412+
private static class OnEventWatermarkGenerator implements WatermarkGenerator<RowData> {
413+
@Override
414+
public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) {
415+
output.emitWatermark(new Watermark(eventTimestamp));
416+
}
417+
418+
@Override
419+
public void onPeriodicEmit(WatermarkOutput output) {}
420+
}
421+
269422
private static RowData createRowData(
270423
Long orderId, Long itemId, Integer amount, String address, RowKind rowKind) {
271424
GenericRowData row = new GenericRowData(4);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Optional;
5858
import java.util.Set;
5959

60+
import static org.apache.fluss.config.ConfigOptions.NoKeyAssigner.ROUND_ROBIN;
6061
import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
6162
import static org.apache.fluss.testutils.DataTestUtils.row;
6263
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
@@ -149,6 +150,7 @@ public class FlinkTestBase extends AbstractTestBase {
149150
@BeforeAll
150151
protected static void beforeAll() {
151152
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
153+
clientConf.set(ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, ROUND_ROBIN);
152154
bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
153155
conn = ConnectionFactory.createConnection(clientConf);
154156
admin = conn.getAdmin();

0 commit comments

Comments
 (0)