Skip to content

Commit 0a5bc51

Browse files
committed
fix(fuse/mutation): avoid panic when filtering by _block_name
Mutation planning was passing planner metadata column indexes (including internal columns like _block_name) as Fuse table schema field indexes, which could cause TableSchema::project() to index out of bounds during DELETE. Convert mutation read columns to base column_positions (schema FieldIndex), carry internal columns separately, and generate them from block_meta_index at execution time for filter evaluation.
1 parent 7e6ddc5 commit 0a5bc51

File tree

5 files changed

+179
-7
lines changed

5 files changed

+179
-7
lines changed

src/query/service/src/interpreters/interpreter_mutation.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_expression::SendableDataBlockStream;
2828
use databend_common_expression::types::UInt64Type;
2929
use databend_common_pipeline::core::ProcessorPtr;
3030
use databend_common_pipeline::sinks::EmptySink;
31+
use databend_common_sql::ColumnEntry;
3132
use databend_common_sql::binder::MutationStrategy;
3233
use databend_common_sql::binder::MutationType;
3334
use databend_common_sql::executor::physical_plans::MutationKind;
@@ -278,6 +279,21 @@ async fn mutation_source_partitions(
278279
(None, vec![])
279280
};
280281

282+
let mut read_column_positions = Vec::new();
283+
if !filter_used_columns.is_empty() {
284+
let metadata = mutation.metadata.read();
285+
for column_index in &filter_used_columns {
286+
if let ColumnEntry::BaseTableColumn(base) = metadata.column(*column_index) {
287+
if let Some(pos) = base.column_position {
288+
// column_position is 1-based
289+
read_column_positions.push(pos - 1);
290+
}
291+
}
292+
}
293+
read_column_positions.sort_unstable();
294+
read_column_positions.dedup();
295+
}
296+
281297
let (is_lazy, is_delete) = if mutation.mutation_type == MutationType::Delete {
282298
let cluster = ctx.get_cluster();
283299
let is_lazy = fuse_table.is_column_oriented()
@@ -292,7 +308,7 @@ async fn mutation_source_partitions(
292308
.mutation_read_partitions(
293309
ctx,
294310
table_snapshot,
295-
filter_used_columns,
311+
read_column_positions,
296312
filters,
297313
is_lazy,
298314
is_delete,

src/query/service/src/physical_plans/physical_mutation_source.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub struct MutationSource {
6868
pub output_schema: DataSchemaRef,
6969
pub input_type: MutationType,
7070
pub read_partition_columns: ColumnSet,
71+
pub read_column_positions: Vec<usize>,
72+
pub internal_columns: Vec<databend_common_catalog::plan::InternalColumn>,
7173
pub truncate_table: bool,
7274

7375
pub partitions: Partitions,
@@ -110,6 +112,8 @@ impl IPhysicalPlan for MutationSource {
110112
output_schema: self.output_schema.clone(),
111113
input_type: self.input_type.clone(),
112114
read_partition_columns: self.read_partition_columns.clone(),
115+
read_column_positions: self.read_column_positions.clone(),
116+
internal_columns: self.internal_columns.clone(),
113117
truncate_table: self.truncate_table,
114118
partitions: self.partitions.clone(),
115119
statistics: self.statistics.clone(),
@@ -143,8 +147,7 @@ impl IPhysicalPlan for MutationSource {
143147
);
144148
}
145149

146-
let read_partition_columns: Vec<usize> =
147-
self.read_partition_columns.clone().into_iter().collect();
150+
let read_partition_columns = self.read_column_positions.clone();
148151

149152
let is_lazy = self.partitions.partitions_type() == PartInfoType::LazyLevel && is_delete;
150153
if is_lazy {
@@ -191,13 +194,14 @@ impl IPhysicalPlan for MutationSource {
191194
} else {
192195
MutationAction::Update
193196
};
194-
let col_indices = self.read_partition_columns.clone().into_iter().collect();
197+
let col_indices = self.read_column_positions.clone();
195198
let update_mutation_with_filter =
196199
self.input_type == MutationType::Update && filter.is_some();
197200
table.add_mutation_source(
198201
builder.ctx.clone(),
199202
filter,
200203
col_indices,
204+
self.internal_columns.clone(),
201205
&mut builder.main_pipeline,
202206
mutation_action,
203207
)?;
@@ -262,6 +266,25 @@ impl PhysicalPlanBuilder {
262266
}
263267
let output_schema = DataSchemaRefExt::create(fields);
264268

269+
let mut read_column_positions = Vec::new();
270+
let mut internal_columns = Vec::new();
271+
for column_index in mutation_source.read_partition_columns.iter() {
272+
match metadata.column(*column_index) {
273+
databend_common_sql::ColumnEntry::BaseTableColumn(base) => {
274+
if let Some(pos) = base.column_position {
275+
// column_position is 1-based
276+
read_column_positions.push(pos - 1);
277+
}
278+
}
279+
databend_common_sql::ColumnEntry::InternalColumn(internal) => {
280+
internal_columns.push(internal.internal_column.clone());
281+
}
282+
_ => {}
283+
}
284+
}
285+
read_column_positions.sort_unstable();
286+
read_column_positions.dedup();
287+
265288
let truncate_table =
266289
mutation_source.mutation_type == MutationType::Delete && filters.is_none();
267290
Ok(PhysicalPlan::new(MutationSource {
@@ -271,6 +294,8 @@ impl PhysicalPlanBuilder {
271294
filters,
272295
input_type: mutation_source.mutation_type.clone(),
273296
read_partition_columns: mutation_source.read_partition_columns.clone(),
297+
read_column_positions,
298+
internal_columns,
274299
truncate_table,
275300
meta: PhysicalPlanMeta::new("MutationSource"),
276301
partitions: mutation_info.partitions.clone(),

src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
// limitations under the License.
1414

1515
use databend_common_base::base::tokio;
16+
use databend_common_exception::ErrorCode;
1617
use databend_common_exception::Result;
18+
use databend_common_expression::ScalarRef;
19+
use databend_common_expression::types::NumberScalar;
1720
use databend_query::test_kits::*;
21+
use futures::TryStreamExt;
1822

1923
#[tokio::test(flavor = "multi_thread")]
2024
async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
@@ -56,3 +60,63 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
5660

5761
Ok(())
5862
}
63+
64+
#[tokio::test(flavor = "multi_thread")]
65+
async fn test_delete_by_block_name() -> Result<()> {
66+
let fixture = TestFixture::setup().await?;
67+
let db = fixture.default_db_name();
68+
let tbl = fixture.default_table_name();
69+
70+
fixture.create_default_database().await?;
71+
fixture.create_default_table().await?;
72+
73+
fixture
74+
.execute_command(&format!("insert into {db}.{tbl} values(1, (2, 3))"))
75+
.await?;
76+
77+
let blocks = fixture
78+
.execute_query(&format!("select _block_name from {db}.{tbl} limit 1"))
79+
.await?
80+
.try_collect::<Vec<_>>()
81+
.await?;
82+
let block_name = match blocks.get(0).and_then(|b| b.get_by_offset(0).index(0)) {
83+
Some(ScalarRef::String(s)) => s.to_string(),
84+
Some(other) => {
85+
return Err(ErrorCode::Internal(format!(
86+
"unexpected _block_name scalar: {other:?}"
87+
)));
88+
}
89+
None => {
90+
return Err(ErrorCode::Internal(
91+
"failed to fetch _block_name".to_string(),
92+
));
93+
}
94+
};
95+
96+
fixture
97+
.execute_command(&format!(
98+
"delete from {db}.{tbl} where _block_name = '{block_name}'"
99+
))
100+
.await?;
101+
102+
let count_blocks = fixture
103+
.execute_query(&format!("select count() from {db}.{tbl}"))
104+
.await?
105+
.try_collect::<Vec<_>>()
106+
.await?;
107+
let count = match count_blocks
108+
.get(0)
109+
.and_then(|b| b.get_by_offset(0).index(0))
110+
{
111+
Some(ScalarRef::Number(NumberScalar::UInt64(v))) => v,
112+
Some(other) => {
113+
return Err(ErrorCode::Internal(format!(
114+
"unexpected count() scalar: {other:?}"
115+
)));
116+
}
117+
None => return Err(ErrorCode::Internal("failed to fetch count()".to_string())),
118+
};
119+
assert_eq!(count, 0);
120+
121+
Ok(())
122+
}

src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use std::ops::Not;
1717
use std::sync::Arc;
1818

1919
use databend_common_base::base::ProgressValues;
20+
use databend_common_catalog::plan::InternalColumn;
21+
use databend_common_catalog::plan::InternalColumnMeta;
2022
use databend_common_catalog::plan::PartInfoPtr;
2123
use databend_common_catalog::plan::build_origin_block_row_num;
2224
use databend_common_catalog::plan::gen_mutation_stream_meta;
@@ -82,6 +84,7 @@ pub struct MutationSource {
8284
filter: Arc<Option<Expr>>,
8385
block_reader: Arc<BlockReader>,
8486
remain_reader: Arc<Option<BlockReader>>,
87+
internal_columns: Vec<InternalColumn>,
8588
operators: Vec<BlockOperator>,
8689
storage_format: FuseStorageFormat,
8790
action: MutationAction,
@@ -99,6 +102,7 @@ impl MutationSource {
99102
filter: Arc<Option<Expr>>,
100103
block_reader: Arc<BlockReader>,
101104
remain_reader: Arc<Option<BlockReader>>,
105+
internal_columns: Vec<InternalColumn>,
102106
operators: Vec<BlockOperator>,
103107
storage_format: FuseStorageFormat,
104108
) -> Result<ProcessorPtr> {
@@ -109,6 +113,7 @@ impl MutationSource {
109113
filter,
110114
block_reader,
111115
remain_reader,
116+
internal_columns,
112117
operators,
113118
storage_format,
114119
action,
@@ -186,6 +191,11 @@ impl Processor for MutationSource {
186191
if let Some(filter) = self.filter.as_ref() {
187192
assert_eq!(filter.data_type(), &DataType::Boolean);
188193

194+
let base_cols_len = data_block.num_columns();
195+
if !self.internal_columns.is_empty() {
196+
data_block = self.add_internal_columns(data_block, fuse_part)?;
197+
}
198+
189199
let func_ctx = self.ctx.get_function_context()?;
190200
let evaluator = Evaluator::new(&data_block, &func_ctx, &BUILTIN_FUNCTIONS);
191201

@@ -195,6 +205,12 @@ impl Processor for MutationSource {
195205
.try_downcast::<BooleanType>()
196206
.unwrap();
197207

208+
if data_block.num_columns() != base_cols_len {
209+
let projections: std::collections::BTreeSet<usize> =
210+
(0..base_cols_len).collect();
211+
data_block = data_block.project(&projections);
212+
}
213+
198214
let affect_rows = match &predicates {
199215
Value::Scalar(v) => {
200216
if *v {
@@ -417,6 +433,39 @@ impl Processor for MutationSource {
417433
}
418434

419435
impl MutationSource {
436+
fn add_internal_columns(
437+
&self,
438+
mut data_block: DataBlock,
439+
part: &FuseBlockPartInfo,
440+
) -> Result<DataBlock> {
441+
let Some(block_meta_index) = part.block_meta_index.as_ref() else {
442+
return Err(ErrorCode::Internal(
443+
"block_meta_index is missing for internal columns".to_string(),
444+
));
445+
};
446+
447+
let num_rows = data_block.num_rows();
448+
let internal_column_meta = InternalColumnMeta {
449+
segment_idx: block_meta_index.segment_idx,
450+
block_id: block_meta_index.block_id,
451+
block_location: block_meta_index.block_location.clone(),
452+
segment_location: block_meta_index.segment_location.clone(),
453+
snapshot_location: block_meta_index.snapshot_location.clone(),
454+
offsets: None,
455+
base_block_ids: None,
456+
inner: None,
457+
matched_rows: block_meta_index.matched_rows.clone(),
458+
matched_scores: block_meta_index.matched_scores.clone(),
459+
vector_scores: block_meta_index.vector_scores.clone(),
460+
};
461+
462+
for internal_column in &self.internal_columns {
463+
let entry = internal_column.generate_column_values(&internal_column_meta, num_rows);
464+
data_block.add_entry(entry);
465+
}
466+
Ok(data_block)
467+
}
468+
420469
fn update_mutation_status(&self, num_rows: usize) {
421470
let (update_rows, deleted_rows) = if self.action == MutationAction::Update {
422471
(num_rows as u64, 0)

src/query/storages/fuse/src/operations/mutation_source.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_catalog::plan::Filters;
18+
use databend_common_catalog::plan::InternalColumn;
1819
use databend_common_catalog::plan::PartInfoPtr;
1920
use databend_common_catalog::plan::PartStatistics;
2021
use databend_common_catalog::plan::Partitions;
@@ -53,6 +54,7 @@ impl FuseTable {
5354
ctx: Arc<dyn TableContext>,
5455
filter: Option<RemoteExpr<String>>,
5556
col_indices: Vec<usize>,
57+
internal_columns: Vec<InternalColumn>,
5658
pipeline: &mut Pipeline,
5759
mutation_action: MutationAction,
5860
) -> Result<()> {
@@ -65,10 +67,25 @@ impl FuseTable {
6567
};
6668
let projection = Projection::Columns(col_indices.clone());
6769
let update_stream_columns = self.change_tracking_enabled();
68-
let block_reader =
69-
self.create_block_reader(ctx.clone(), projection, false, update_stream_columns, false)?;
70+
let query_internal_columns = !internal_columns.is_empty();
71+
let block_reader = self.create_block_reader(
72+
ctx.clone(),
73+
projection,
74+
query_internal_columns,
75+
update_stream_columns,
76+
false,
77+
)?;
7078

71-
let schema = block_reader.schema().as_ref().clone();
79+
let mut schema = block_reader.schema().as_ref().clone();
80+
if query_internal_columns {
81+
for internal_column in &internal_columns {
82+
schema.add_internal_field(
83+
internal_column.column_name(),
84+
internal_column.table_data_type(),
85+
internal_column.column_id(),
86+
);
87+
}
88+
}
7289
let filter_expr = Arc::new(filter.map(|v| {
7390
v.as_expr(&BUILTIN_FUNCTIONS)
7491
.project_column_ref(|name| schema.index_of(name))
@@ -120,6 +137,7 @@ impl FuseTable {
120137
filter_expr.clone(),
121138
block_reader.clone(),
122139
remain_reader.clone(),
140+
internal_columns.clone(),
123141
ops.clone(),
124142
self.storage_format,
125143
)

0 commit comments

Comments
 (0)