Skip to content

Commit d13d891

Browse files
ethan-tyleralamb
andauthored
feat: Add DELETE/UPDATE hooks to TableProvider trait and to MemTable implementation (#19142)
Add infrastructure for row-level DML operations (DELETE/UPDATE) to the TableProvider trait, enabling storage engines to implement SQL-based mutations. Changes: - Add TableProvider::delete_from() method for DELETE operations - Add TableProvider::update() method for UPDATE operations - Wire physical planner to route DML operations to TableProvider - Implement DELETE/UPDATE for MemTable as reference implementation - Add comprehensive sqllogictest coverage This provides the API surface for downstream projects (iceberg-rust, delta-rs) to implement DML without custom query planners. ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #16959 - Related to #12406 ## Rationale for this change Datafusion parses DELETE/UPDATE but returns NotImplemented("Unsupported logical plan: Dml(Delete)") at physical planning. Downstream projects (iceberg-rust, delta-rs) must implement custom planners to work around this. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Adds TableProvider hooks for row-level DML: - delete_from(state, filters) - deletes rows matching filter predicates - update(state, assignments, filters) - updates matching rows with new values Physical planner routes WriteOp::Delete and WriteOp::Update to these methods. Tables that don't support DML return NotImplemented (the default behavior). MemTable reference implementation demonstrates: - Filter evaluation with SQL three-valued logic (NULL predicates don't match) - Multi-column updates with expression evaluation - Proper row count reporting <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes: - Unit tests for physical planning: cargo test -p datafusion --test custom_sources_cases - SQL logic tests: dml_delete.slt and dml_update.slt with comprehensive coverage <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? New trait methods on TableProvider: ``` async fn delete_from(&self, state: &dyn Session, filters: Vec<Expr>) -> Result<Arc<dyn ExecutionPlan>>; async fn update(&self, state: &dyn Session, assignments: Vec<(String, Expr)>, filters: Vec<Expr>) -> Result<Arc<dyn ExecutionPlan>>; ``` Fully backward compatible. Default implementations return NotImplemented. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 34addca commit d13d891

File tree

11 files changed

+1350
-20
lines changed

11 files changed

+1350
-20
lines changed

datafusion/catalog/src/memory/table.rs

Lines changed: 346 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ use std::sync::Arc;
2424

2525
use crate::TableProvider;
2626

27-
use arrow::datatypes::SchemaRef;
27+
use arrow::array::{
28+
Array, ArrayRef, BooleanArray, RecordBatch as ArrowRecordBatch, UInt64Array,
29+
};
30+
use arrow::compute::kernels::zip::zip;
31+
use arrow::compute::{and, filter_record_batch};
32+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2833
use arrow::record_batch::RecordBatch;
2934
use datafusion_common::error::Result;
3035
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
@@ -34,10 +39,14 @@ use datafusion_datasource::sink::DataSinkExec;
3439
use datafusion_datasource::source::DataSourceExec;
3540
use datafusion_expr::dml::InsertOp;
3641
use datafusion_expr::{Expr, SortExpr, TableType};
37-
use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
42+
use datafusion_physical_expr::{
43+
LexOrdering, create_physical_expr, create_physical_sort_exprs,
44+
};
3845
use datafusion_physical_plan::repartition::RepartitionExec;
46+
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
3947
use datafusion_physical_plan::{
40-
ExecutionPlan, ExecutionPlanProperties, Partitioning, common,
48+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
49+
PlanProperties, common,
4150
};
4251
use datafusion_session::Session;
4352

@@ -295,4 +304,338 @@ impl TableProvider for MemTable {
295304
fn get_column_default(&self, column: &str) -> Option<&Expr> {
296305
self.column_defaults.get(column)
297306
}
307+
308+
async fn delete_from(
309+
&self,
310+
state: &dyn Session,
311+
filters: Vec<Expr>,
312+
) -> Result<Arc<dyn ExecutionPlan>> {
313+
// Early exit if table has no partitions
314+
if self.batches.is_empty() {
315+
return Ok(Arc::new(DmlResultExec::new(0)));
316+
}
317+
318+
*self.sort_order.lock() = vec![];
319+
320+
let mut total_deleted: u64 = 0;
321+
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
322+
323+
for partition_data in &self.batches {
324+
let mut partition = partition_data.write().await;
325+
let mut new_batches = Vec::with_capacity(partition.len());
326+
327+
for batch in partition.iter() {
328+
if batch.num_rows() == 0 {
329+
continue;
330+
}
331+
332+
// Evaluate filters - None means "match all rows"
333+
let filter_mask = evaluate_filters_to_mask(
334+
&filters,
335+
batch,
336+
&df_schema,
337+
state.execution_props(),
338+
)?;
339+
340+
let (delete_count, keep_mask) = match filter_mask {
341+
Some(mask) => {
342+
// Count rows where mask is true (will be deleted)
343+
let count = mask.iter().filter(|v| v == &Some(true)).count();
344+
// Keep rows where predicate is false or NULL (SQL three-valued logic)
345+
let keep: BooleanArray =
346+
mask.iter().map(|v| Some(v != Some(true))).collect();
347+
(count, keep)
348+
}
349+
None => {
350+
// No filters = delete all rows
351+
(
352+
batch.num_rows(),
353+
BooleanArray::from(vec![false; batch.num_rows()]),
354+
)
355+
}
356+
};
357+
358+
total_deleted += delete_count as u64;
359+
360+
let filtered_batch = filter_record_batch(batch, &keep_mask)?;
361+
if filtered_batch.num_rows() > 0 {
362+
new_batches.push(filtered_batch);
363+
}
364+
}
365+
366+
*partition = new_batches;
367+
}
368+
369+
Ok(Arc::new(DmlResultExec::new(total_deleted)))
370+
}
371+
372+
async fn update(
373+
&self,
374+
state: &dyn Session,
375+
assignments: Vec<(String, Expr)>,
376+
filters: Vec<Expr>,
377+
) -> Result<Arc<dyn ExecutionPlan>> {
378+
// Early exit if table has no partitions
379+
if self.batches.is_empty() {
380+
return Ok(Arc::new(DmlResultExec::new(0)));
381+
}
382+
383+
// Validate column names upfront with clear error messages
384+
let available_columns: Vec<&str> = self
385+
.schema
386+
.fields()
387+
.iter()
388+
.map(|f| f.name().as_str())
389+
.collect();
390+
for (column_name, _) in &assignments {
391+
if self.schema.field_with_name(column_name).is_err() {
392+
return plan_err!(
393+
"UPDATE failed: column '{}' does not exist. Available columns: {}",
394+
column_name,
395+
available_columns.join(", ")
396+
);
397+
}
398+
}
399+
400+
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
401+
402+
// Create physical expressions for assignments upfront (outside batch loop)
403+
let physical_assignments: HashMap<
404+
String,
405+
Arc<dyn datafusion_physical_plan::PhysicalExpr>,
406+
> = assignments
407+
.iter()
408+
.map(|(name, expr)| {
409+
let physical_expr =
410+
create_physical_expr(expr, &df_schema, state.execution_props())?;
411+
Ok((name.clone(), physical_expr))
412+
})
413+
.collect::<Result<_>>()?;
414+
415+
*self.sort_order.lock() = vec![];
416+
417+
let mut total_updated: u64 = 0;
418+
419+
for partition_data in &self.batches {
420+
let mut partition = partition_data.write().await;
421+
let mut new_batches = Vec::with_capacity(partition.len());
422+
423+
for batch in partition.iter() {
424+
if batch.num_rows() == 0 {
425+
continue;
426+
}
427+
428+
// Evaluate filters - None means "match all rows"
429+
let filter_mask = evaluate_filters_to_mask(
430+
&filters,
431+
batch,
432+
&df_schema,
433+
state.execution_props(),
434+
)?;
435+
436+
let (update_count, update_mask) = match filter_mask {
437+
Some(mask) => {
438+
// Count rows where mask is true (will be updated)
439+
let count = mask.iter().filter(|v| v == &Some(true)).count();
440+
// Normalize mask: only true (not NULL) triggers update
441+
let normalized: BooleanArray =
442+
mask.iter().map(|v| Some(v == Some(true))).collect();
443+
(count, normalized)
444+
}
445+
None => {
446+
// No filters = update all rows
447+
(
448+
batch.num_rows(),
449+
BooleanArray::from(vec![true; batch.num_rows()]),
450+
)
451+
}
452+
};
453+
454+
total_updated += update_count as u64;
455+
456+
if update_count == 0 {
457+
new_batches.push(batch.clone());
458+
continue;
459+
}
460+
461+
let mut new_columns: Vec<ArrayRef> =
462+
Vec::with_capacity(batch.num_columns());
463+
464+
for field in self.schema.fields() {
465+
let column_name = field.name();
466+
let original_column =
467+
batch.column_by_name(column_name).ok_or_else(|| {
468+
datafusion_common::DataFusionError::Internal(format!(
469+
"Column '{column_name}' not found in batch"
470+
))
471+
})?;
472+
473+
let new_column = if let Some(physical_expr) =
474+
physical_assignments.get(column_name.as_str())
475+
{
476+
// Use evaluate_selection to only evaluate on matching rows.
477+
// This avoids errors (e.g., divide-by-zero) on rows that won't
478+
// be updated. The result is scattered back with nulls for
479+
// non-matching rows, which zip() will replace with originals.
480+
let new_values =
481+
physical_expr.evaluate_selection(batch, &update_mask)?;
482+
let new_array = new_values.into_array(batch.num_rows())?;
483+
484+
// Convert to &dyn Array which implements Datum
485+
let new_arr: &dyn Array = new_array.as_ref();
486+
let orig_arr: &dyn Array = original_column.as_ref();
487+
zip(&update_mask, &new_arr, &orig_arr)?
488+
} else {
489+
Arc::clone(original_column)
490+
};
491+
492+
new_columns.push(new_column);
493+
}
494+
495+
let updated_batch =
496+
ArrowRecordBatch::try_new(Arc::clone(&self.schema), new_columns)?;
497+
new_batches.push(updated_batch);
498+
}
499+
500+
*partition = new_batches;
501+
}
502+
503+
Ok(Arc::new(DmlResultExec::new(total_updated)))
504+
}
505+
}
506+
507+
/// Evaluate filter expressions against a batch and return a combined boolean mask.
508+
/// Returns None if filters is empty (meaning "match all rows").
509+
/// The returned mask has true for rows that match the filter predicates.
510+
fn evaluate_filters_to_mask(
511+
filters: &[Expr],
512+
batch: &RecordBatch,
513+
df_schema: &DFSchema,
514+
execution_props: &datafusion_expr::execution_props::ExecutionProps,
515+
) -> Result<Option<BooleanArray>> {
516+
if filters.is_empty() {
517+
return Ok(None);
518+
}
519+
520+
let mut combined_mask: Option<BooleanArray> = None;
521+
522+
for filter_expr in filters {
523+
let physical_expr =
524+
create_physical_expr(filter_expr, df_schema, execution_props)?;
525+
526+
let result = physical_expr.evaluate(batch)?;
527+
let array = result.into_array(batch.num_rows())?;
528+
let bool_array = array
529+
.as_any()
530+
.downcast_ref::<BooleanArray>()
531+
.ok_or_else(|| {
532+
datafusion_common::DataFusionError::Internal(
533+
"Filter did not evaluate to boolean".to_string(),
534+
)
535+
})?
536+
.clone();
537+
538+
combined_mask = Some(match combined_mask {
539+
Some(existing) => and(&existing, &bool_array)?,
540+
None => bool_array,
541+
});
542+
}
543+
544+
Ok(combined_mask)
545+
}
546+
547+
/// Returns a single row with the count of affected rows.
548+
#[derive(Debug)]
549+
struct DmlResultExec {
550+
rows_affected: u64,
551+
schema: SchemaRef,
552+
properties: PlanProperties,
553+
}
554+
555+
impl DmlResultExec {
556+
fn new(rows_affected: u64) -> Self {
557+
let schema = Arc::new(Schema::new(vec![Field::new(
558+
"count",
559+
DataType::UInt64,
560+
false,
561+
)]));
562+
563+
let properties = PlanProperties::new(
564+
datafusion_physical_expr::EquivalenceProperties::new(Arc::clone(&schema)),
565+
Partitioning::UnknownPartitioning(1),
566+
datafusion_physical_plan::execution_plan::EmissionType::Final,
567+
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
568+
);
569+
570+
Self {
571+
rows_affected,
572+
schema,
573+
properties,
574+
}
575+
}
576+
}
577+
578+
impl DisplayAs for DmlResultExec {
579+
fn fmt_as(
580+
&self,
581+
t: DisplayFormatType,
582+
f: &mut std::fmt::Formatter,
583+
) -> std::fmt::Result {
584+
match t {
585+
DisplayFormatType::Default
586+
| DisplayFormatType::Verbose
587+
| DisplayFormatType::TreeRender => {
588+
write!(f, "DmlResultExec: rows_affected={}", self.rows_affected)
589+
}
590+
}
591+
}
592+
}
593+
594+
impl ExecutionPlan for DmlResultExec {
595+
fn name(&self) -> &str {
596+
"DmlResultExec"
597+
}
598+
599+
fn as_any(&self) -> &dyn Any {
600+
self
601+
}
602+
603+
fn schema(&self) -> SchemaRef {
604+
Arc::clone(&self.schema)
605+
}
606+
607+
fn properties(&self) -> &PlanProperties {
608+
&self.properties
609+
}
610+
611+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
612+
vec![]
613+
}
614+
615+
fn with_new_children(
616+
self: Arc<Self>,
617+
_children: Vec<Arc<dyn ExecutionPlan>>,
618+
) -> Result<Arc<dyn ExecutionPlan>> {
619+
Ok(self)
620+
}
621+
622+
fn execute(
623+
&self,
624+
_partition: usize,
625+
_context: Arc<datafusion_execution::TaskContext>,
626+
) -> Result<datafusion_execution::SendableRecordBatchStream> {
627+
// Create a single batch with the count
628+
let count_array = UInt64Array::from(vec![self.rows_affected]);
629+
let batch = ArrowRecordBatch::try_new(
630+
Arc::clone(&self.schema),
631+
vec![Arc::new(count_array) as ArrayRef],
632+
)?;
633+
634+
// Create a stream that yields just this one batch
635+
let stream = futures::stream::iter(vec![Ok(batch)]);
636+
Ok(Box::pin(RecordBatchStreamAdapter::new(
637+
Arc::clone(&self.schema),
638+
stream,
639+
)))
640+
}
298641
}

0 commit comments

Comments
 (0)