Skip to content

Commit c2adadc

Browse files
committed
feat(frontend): auto disable snapshot backfill in some cases
1 parent 35a761d commit c2adadc

File tree

17 files changed

+240
-152
lines changed

17 files changed

+240
-152
lines changed

e2e_test/source_inline/kafka/shared_source.slt.serial

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,6 @@ create materialized view mv_before_produce as select * from s_before_produce;
2424

2525
sleep 2s
2626

27-
statement ok
28-
set STREAMING_USE_SNAPSHOT_BACKFILL TO true;
29-
30-
statement error Not supported: Snapshot backfill with shared source backfill is not supported
31-
create materialized view mv_snapshot_backfill_shared_source as (select * from mv_before_produce) union (select * from s_before_produce);
32-
33-
statement ok
34-
set STREAMING_USE_SNAPSHOT_BACKFILL TO false;
35-
3627
# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately.
3728
system ok
3829
internal_table.mjs --name mv_before_produce --type sourcebackfill

src/frontend/planner_test/tests/testdata/input/shared_source.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,14 @@
6666
expected_outputs:
6767
- batch_plan
6868
- stream_plan
69+
- with_config_map:
70+
streaming_use_shared_source: true
71+
before:
72+
- create_source
73+
sql: |
74+
CREATE TABLE t(x int,y int);
75+
set streaming_use_snapshot_backfill = true;
76+
select * from s union select * from t;
77+
expected_outputs:
78+
- logical_plan
79+
- stream_plan

src/frontend/planner_test/tests/testdata/input/temporal_join.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,4 @@
182182
create table version(id2 int, a2 int, b2 int, primary key (id2));
183183
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2;
184184
expected_outputs:
185-
- stream_error
185+
- stream_plan

src/frontend/planner_test/tests/testdata/output/batch_source.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
select * from s
44
logical_plan: |-
55
LogicalProject { exprs: [id, value] }
6-
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
6+
└─LogicalSource { source: s, is_shared: true, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
77
batch_plan: |-
88
BatchExchange { order: [], dist: Single }
99
└─BatchProject { exprs: [id, value] }

src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@
405405
│ └─LogicalProject { exprs: [window_start, auction] }
406406
│ └─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
407407
│ └─LogicalFilter { predicate: IsNotNull(date_time) }
408-
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
408+
│ └─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
409409
└─LogicalProject { exprs: [max(count), window_start] }
410410
└─LogicalAgg { group_key: [window_start], aggs: [max(count)] }
411411
└─LogicalProject { exprs: [window_start, count] }
@@ -414,7 +414,7 @@
414414
└─LogicalProject { exprs: [auction, window_start] }
415415
└─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
416416
└─LogicalFilter { predicate: IsNotNull(date_time) }
417-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
417+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
418418
batch_plan: |-
419419
BatchExchange { order: [], dist: Single }
420420
└─BatchHashJoin { type: Inner, predicate: window_start = window_start AND (count >= max(count)), output: [auction, count] }
@@ -859,13 +859,13 @@
859859
└─LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id, auction, bidder, price, channel, url, date_time, extra, _row_id] }
860860
└─LogicalFilter { predicate: (id = auction) AND (date_time >= date_time) AND (date_time <= expires) }
861861
└─LogicalJoin { type: Inner, on: true, output: all }
862-
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
863-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
862+
├─LogicalSource { source: auction, is_shared: false, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
863+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
864864
optimized_logical_plan_for_batch: |-
865865
LogicalTopN { order: [price DESC, date_time ASC], limit: 1, offset: 0, group_key: [id] }
866866
└─LogicalJoin { type: Inner, on: (id = auction) AND (date_time >= date_time) AND (date_time <= expires), output: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
867-
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
868-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
867+
├─LogicalSource { source: auction, is_shared: false, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
868+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
869869
batch_plan: |-
870870
BatchExchange { order: [], dist: Single }
871871
└─BatchGroupTopN { order: [price DESC, date_time ASC], limit: 1, offset: 0, group_key: [id] }
@@ -1480,7 +1480,7 @@
14801480
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
14811481
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
14821482
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1483-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1483+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
14841484
batch_plan: |-
14851485
BatchExchange { order: [], dist: Single }
14861486
└─BatchGroupTopN { order: [date_time DESC], limit: 1, offset: 0, group_key: [bidder, auction] }
@@ -1537,7 +1537,7 @@
15371537
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, rank] }
15381538
└─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
15391539
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1540-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1540+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
15411541
batch_plan: |-
15421542
BatchExchange { order: [], dist: Single }
15431543
└─BatchGroupTopN { order: [date_time DESC], limit: 1, offset: 0, with_ties: true, group_key: [bidder, auction] }
@@ -1591,7 +1591,7 @@
15911591
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
15921592
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
15931593
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1594-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
1594+
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
15951595
batch_plan: |-
15961596
BatchExchange { order: [], dist: Single }
15971597
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }

src/frontend/planner_test/tests/testdata/output/nexmark_source_kafka.yaml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@
384384
│ └─LogicalProject { exprs: [window_start, auction] }
385385
│ └─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
386386
│ └─LogicalFilter { predicate: IsNotNull(date_time) }
387-
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
387+
│ └─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
388388
└─LogicalProject { exprs: [max(count), window_start] }
389389
└─LogicalAgg { group_key: [window_start], aggs: [max(count)] }
390390
└─LogicalProject { exprs: [window_start, count] }
@@ -393,7 +393,7 @@
393393
└─LogicalProject { exprs: [auction, window_start] }
394394
└─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
395395
└─LogicalFilter { predicate: IsNotNull(date_time) }
396-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
396+
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
397397
batch_plan: |-
398398
BatchExchange { order: [], dist: Single }
399399
└─BatchHashJoin { type: Inner, predicate: window_start = window_start AND (count >= max(count)), output: [auction, count] }
@@ -851,8 +851,8 @@
851851
└─LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
852852
└─LogicalFilter { predicate: (id = auction) AND (date_time >= date_time) AND (date_time <= expires) }
853853
└─LogicalJoin { type: Inner, on: true, output: all }
854-
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
855-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
854+
├─LogicalSource { source: auction, is_shared: true, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
855+
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
856856
optimized_logical_plan_for_batch: |-
857857
LogicalTopN { order: [price DESC, date_time ASC], limit: 1, offset: 0, group_key: [id] }
858858
└─LogicalJoin { type: Inner, on: (id = auction) AND (date_time >= date_time) AND (date_time <= expires), output: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
@@ -1489,7 +1489,7 @@
14891489
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
14901490
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
14911491
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1492-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1492+
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
14931493
batch_plan: |-
14941494
BatchExchange { order: [], dist: Single }
14951495
└─BatchGroupTopN { order: [date_time DESC], limit: 1, offset: 0, group_key: [bidder, auction] }
@@ -1548,7 +1548,7 @@
15481548
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, rank] }
15491549
└─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
15501550
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1551-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1551+
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
15521552
batch_plan: |-
15531553
BatchExchange { order: [], dist: Single }
15541554
└─BatchGroupTopN { order: [date_time DESC], limit: 1, offset: 0, with_ties: true, group_key: [bidder, auction] }
@@ -1603,7 +1603,7 @@
16031603
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
16041604
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
16051605
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1606-
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
1606+
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
16071607
batch_plan: |-
16081608
BatchExchange { order: [], dist: Single }
16091609
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }

0 commit comments

Comments
 (0)