Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ test_snapshot_and_upstream_read() {
# Provide snapshot
run_sql_file "$PARENT_PATH"/sql/backfill/basic/insert.sql

run_sql "alter table t1 set dml_rate_limit = 10"

# Provide updates ...
run_sql_file "$PARENT_PATH"/sql/backfill/basic/insert.sql &

# ... and concurrently create mv.
run_sql_file "$PARENT_PATH"/sql/backfill/basic/create_mv.sql &
run_sql_file "$PARENT_PATH"/sql/backfill/basic/create_mv.sql && run_sql "alter table t1 set dml_rate_limit = default" &

wait

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/sql/backfill/basic/insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ insert into t1
SELECT
generate_series,
'{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}'::jsonb
FROM generate_series(1, 50000);
FROM generate_series(1, 10000);
FLUSH;
9 changes: 0 additions & 9 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ create materialized view mv_before_produce as select * from s_before_produce;

sleep 2s

statement ok
set STREAMING_USE_SNAPSHOT_BACKFILL TO true;

statement error Not supported: Snapshot backfill with shared source backfill is not supported
create materialized view mv_snapshot_backfill_shared_source as (select * from mv_before_produce) union (select * from s_before_produce);
Comment on lines -30 to -31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the test case's expect behavior now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, even though snapshot backfill is enabled by session variable, it will choose to use arrangement backfill anyway with a notice to user about the implicit switch.


statement ok
set STREAMING_USE_SNAPSHOT_BACKFILL TO false;

# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately.
system ok
internal_table.mjs --name mv_before_produce --type sourcebackfill
Expand Down
1 change: 1 addition & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ impl TestCase {
None,
false,
None,
true,
) {
Ok(sink_plan) => {
ret.sink_plan = Some(explain_plan(&sink_plan.into()));
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/shared_source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,14 @@
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
streaming_use_shared_source: true
before:
- create_source
sql: |
CREATE TABLE t(x int,y int);
set streaming_use_snapshot_backfill = true;
select * from s union select * from t;
expected_outputs:
- logical_plan
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,4 @@
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2;
expected_outputs:
- stream_error
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
select * from s
logical_plan: |-
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: s, is_shared: true, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@
│ └─LogicalProject { exprs: [window_start, auction] }
│ └─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
│ └─LogicalFilter { predicate: IsNotNull(date_time) }
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
│ └─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalProject { exprs: [max(count), window_start] }
└─LogicalAgg { group_key: [window_start], aggs: [max(count)] }
└─LogicalProject { exprs: [window_start, count] }
Expand All @@ -414,7 +414,7 @@
└─LogicalProject { exprs: [auction, window_start] }
└─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
└─LogicalFilter { predicate: IsNotNull(date_time) }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: window_start = window_start AND (count >= max(count)), output: [auction, count] }
Expand Down Expand Up @@ -859,15 +859,15 @@
└─LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id, auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalFilter { predicate: (id = auction) AND (date_time >= date_time) AND (date_time <= expires) }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
├─LogicalSource { source: auction, is_shared: false, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
└─LogicalFilter { predicate: (row_number <= 1:Int32) }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY id ORDER BY price DESC, date_time ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalJoin { type: Inner, on: (id = auction) AND (date_time >= date_time) AND (date_time <= expires), output: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
├─LogicalSource { source: auction, is_shared: false, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
Expand Down Expand Up @@ -1485,7 +1485,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, price, channel, url, date_time, extra] }
Expand Down Expand Up @@ -1545,7 +1545,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, rank] }
└─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, price, channel, url, date_time, extra] }
Expand Down Expand Up @@ -1602,7 +1602,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalSource { source: bid, is_shared: false, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (row_number <= 10:Int32) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@
│ └─LogicalProject { exprs: [window_start, auction] }
│ └─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
│ └─LogicalFilter { predicate: IsNotNull(date_time) }
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
│ └─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalProject { exprs: [max(count), window_start] }
└─LogicalAgg { group_key: [window_start], aggs: [max(count)] }
└─LogicalProject { exprs: [window_start, count] }
Expand All @@ -393,7 +393,7 @@
└─LogicalProject { exprs: [auction, window_start] }
└─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
└─LogicalFilter { predicate: IsNotNull(date_time) }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: window_start = window_start AND (count >= max(count)), output: [auction, count] }
Expand Down Expand Up @@ -851,8 +851,8 @@
└─LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalFilter { predicate: (id = auction) AND (date_time >= date_time) AND (date_time <= expires) }
└─LogicalJoin { type: Inner, on: true, output: all }
├─LogicalSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
├─LogicalSource { source: auction, is_shared: true, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time] }
└─LogicalFilter { predicate: (row_number <= 1:Int32) }
Expand Down Expand Up @@ -1494,7 +1494,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, price, channel, url, date_time, extra] }
Expand Down Expand Up @@ -1556,7 +1556,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, rank] }
└─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY bidder, auction ORDER BY date_time DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, price, channel, url, date_time, extra] }
Expand Down Expand Up @@ -1614,7 +1614,7 @@
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, row_number] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY auction ORDER BY price DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
└─LogicalSource { source: bid, is_shared: true, columns: [auction, bidder, price, channel, url, date_time, extra, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (row_number <= 10:Int32) }
Expand Down
Loading
Loading