Skip to content

Commit 2492744

Browse files
committed
forbid for iceberg table engine
1 parent 00f52a5 commit 2492744

File tree

3 files changed

+11
-2
lines changed

3 files changed

+11
-2
lines changed

src/frontend/planner_test/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,7 @@ impl TestCase {
883883
None,
884884
false,
885885
None,
886+
true,
886887
) {
887888
Ok(sink_plan) => {
888889
ret.sink_plan = Some(explain_plan(&sink_plan.into()));

src/frontend/src/handler/create_sink.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,8 @@ pub async fn gen_sink_plan(
387387
}
388388
}
389389

390+
let allow_snapshot_backfill = target_table_catalog.is_none() && !is_iceberg_engine_internal;
391+
390392
let sink_plan = plan_root.gen_sink_plan(
391393
sink_table_name,
392394
definition,
@@ -400,6 +402,7 @@ pub async fn gen_sink_plan(
400402
partition_info,
401403
user_specified_columns,
402404
auto_refresh_schema_from_table,
405+
allow_snapshot_backfill,
403406
)?;
404407

405408
let sink_desc = sink_plan.sink_desc().clone();

src/frontend/src/optimizer/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,7 +1121,7 @@ impl LogicalPlanRoot {
11211121
}
11221122

11231123
/// Optimize and generate a create sink plan.
1124-
#[allow(clippy::too_many_arguments)]
1124+
#[expect(clippy::too_many_arguments)]
11251125
pub fn gen_sink_plan(
11261126
self,
11271127
sink_name: String,
@@ -1136,10 +1136,15 @@ impl LogicalPlanRoot {
11361136
partition_info: Option<PartitionComputeInfo>,
11371137
user_specified_columns: bool,
11381138
auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1139+
allow_snapshot_backfill: bool,
11391140
) -> Result<StreamSink> {
11401141
let backfill_type = if without_backfill {
11411142
BackfillType::UpstreamOnly
1142-
} else if target_table.is_none() && self.should_use_snapshot_backfill() {
1143+
} else if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
1144+
assert!(
1145+
target_table.is_none(),
1146+
"should not allow snapshot backfill for sink-into-table"
1147+
);
11431148
// Snapshot backfill on sink-into-table is not allowed
11441149
BackfillType::SnapshotBackfill
11451150
} else if self.should_use_arrangement_backfill() {

0 commit comments

Comments
 (0)