Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("UPDATE not supported for {} table", self.table_type())
}

/// Remove all rows from the table.
///
/// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64),
/// representing the number of rows removed.
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("TRUNCATE not supported for {}", self.table_type())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DELETE/UPDATE messages say "... for {} table" but this one drops "table"

}
}

/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ impl DefaultPhysicalPlanner {
);
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Truncate,
..
}) => {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
provider
.table_provider
.truncate(session_state)
.await
.map_err(|e| {
e.context(format!(
"TRUNCATE operation on table '{table_name}'"
))
})?
} else {
return exec_err!(
"Table source can't be downcasted to DefaultTableSource"
);
}
}
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_or_internal_err!(
!window_expr.is_empty(),
Expand Down
94 changes: 93 additions & 1 deletion datafusion/core/tests/custom_sources_cases/dml_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Tests for DELETE and UPDATE planning to verify filter and assignment extraction.
//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction.

use std::any::Any;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -165,6 +165,66 @@ impl TableProvider for CaptureUpdateProvider {
}
}

/// A TableProvider that captures whether truncate() was called.
struct CaptureTruncateProvider {
schema: SchemaRef,
truncate_called: Arc<Mutex<bool>>,
}

impl CaptureTruncateProvider {
fn new(schema: SchemaRef) -> Self {
Self {
schema,
truncate_called: Arc::new(Mutex::new(false)),
}
}

fn was_truncated(&self) -> bool {
*self.truncate_called.lock().unwrap()
}
}

impl std::fmt::Debug for CaptureTruncateProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CaptureTruncateProvider")
.field("schema", &self.schema)
.finish()
}
}

#[async_trait]
impl TableProvider for CaptureTruncateProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema))))
}

async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
*self.truncate_called.lock().unwrap() = true;

Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
])))))
}
}
Comment on lines +220 to +227
Copy link
Contributor

@ethan-tyler ethan-tyler Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableProvider::truncate docs say it returns an ExecutionPlan producing {count: UInt64}. This test returns EmptyExec which produces zero rows, it validates the hook was called, but not the contract.

Could tighten this to return an actual row and assert on it:

Suggested change
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
*self.truncate_called.lock().unwrap() = true;
Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
])))))
}
}
// Add these imports:
use arrow::array::UInt64Array;
use arrow::record_batch::RecordBatch;
use datafusion_physical_plan::test::TestMemoryExec;
// Then update the truncate impl:
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
*self.truncate_called.lock().unwrap() = true;
let schema = Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(UInt64Array::from(vec![0u64]))],
)?;
Ok(Arc::new(TestMemoryExec::try_new(&[vec![batch]], schema, None)?))
}


fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Expand Down Expand Up @@ -269,6 +329,23 @@ async fn test_update_assignments() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_truncate_calls_provider() -> Result<()> {
let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
let ctx = SessionContext::new();

ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;

ctx.sql("TRUNCATE TABLE t").await?.collect().await?;

assert!(
provider.was_truncated(),
"truncate() should be called on the TableProvider"
);

Ok(())
}

#[tokio::test]
async fn test_unsupported_table_delete() -> Result<()> {
let schema = test_schema();
Expand All @@ -295,3 +372,18 @@ async fn test_unsupported_table_update() -> Result<()> {
assert!(result.is_err() || result.unwrap().collect().await.is_err());
Ok(())
}

#[tokio::test]
async fn test_unsupported_table_truncate() -> Result<()> {
let schema = test_schema();
let ctx = SessionContext::new();

let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
ctx.register_table("empty_t", Arc::new(empty_table))?;

let result = ctx.sql("TRUNCATE TABLE empty_t").await;

assert!(result.is_err() || result.unwrap().collect().await.is_err());

Ok(())
}
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ pub enum WriteOp {
Update,
/// `CREATE TABLE AS SELECT` operation
Ctas,
/// `TRUNCATE` operation
Truncate,
}

impl WriteOp {
Expand All @@ -247,6 +249,7 @@ impl WriteOp {
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
WriteOp::Truncate => "Truncate",
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ message DmlNode{
INSERT_APPEND = 3;
INSERT_OVERWRITE = 4;
INSERT_REPLACE = 5;
TRUNCATE = 6;
}
Type dml_type = 1;
LogicalPlanNode input = 2;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl From<protobuf::dml_node::Type> for WriteOp {
}
protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type {
WriteOp::Delete => protobuf::dml_node::Type::Delete,
WriteOp::Update => protobuf::dml_node::Type::Update,
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added the proto enum and conversions and worth adding a roundtrip test to lock it in.

In datafusion/proto/tests/cases/roundtrip_logical_plan.rs, the roundtrip_logical_plan_dml test covers INSERT, DELETE, UPDATE, CTAS.

Adding TRUNCATE there would catch any future regressions:

// In the queries array:
"TRUNCATE TABLE test_table",

}
}
}
Expand Down
22 changes: 22 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,28 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
exec_err!("Function name not provided")
}
}
Statement::Truncate { table_names, .. } => {
if table_names.len() != 1 {
return not_impl_err!(
"TRUNCATE with multiple tables is not supported"
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .. here silently drops partitions, identity, cascade, on_cluster, and target.only. If someone writes TRUNCATE TABLE t CASCADE expecting cascade behavior, they'd get a normal truncate with no indication the option was ignored.
Worth rejecting these explicitly? Something like:

Suggested change
Statement::Truncate { table_names, .. } => {
if table_names.len() != 1 {
return not_impl_err!(
"TRUNCATE with multiple tables is not supported"
);
}
Statement::Truncate { table_names, partitions, identity, cascade, on_cluster, .. } => {
if table_names.len() != 1 {
return not_impl_err!("TRUNCATE with multiple tables is not supported");
}
let target = &table_names[0];
if target.only {
return not_impl_err!("TRUNCATE with ONLY is not supported");
}
if partitions.is_some() {
return not_impl_err!("TRUNCATE with PARTITION is not supported");
}
if identity.is_some() {
return not_impl_err!("TRUNCATE with RESTART/CONTINUE IDENTITY is not supported");
}
if cascade.is_some() {
return not_impl_err!("TRUNCATE with CASCADE/RESTRICT is not supported");
}
if on_cluster.is_some() {
return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
}
// ... rest
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add the option rejection, some negative tests would round it out:

statement error TRUNCATE with CASCADE/RESTRICT is not supported
TRUNCATE TABLE t1 CASCADE;

statement error TRUNCATE with multiple tables is not supported
TRUNCATE TABLE t1, t2;


let target = &table_names[0]; // TruncateTableTarget
let table = self.object_name_to_table_reference(target.name.clone())?;
let source = self.context_provider.get_table_source(table.clone())?;

Ok(LogicalPlan::Dml(DmlStatement {
table_name: table.clone(),
target: source,
op: WriteOp::Truncate,
input: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
says the schema must match with table_schema. Is this important here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike DELETE or UPDATE, TRUNCATE is a table-level operation with no filters, assignments. The input relation is only a placeholder to satisfy the DML plan shape and is never inspected or executed for row data. Because no rows or columns are read, the usual “input schema must match table schema” invariant does not apply here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this and you're right that TRUNCATE doesn't use the input.

Physical planner shows:

  • DELETE/UPDATE: extract filters/assignments from input
  • TRUNCATE: ignores input entirely, just calls provider.truncate()

So functionally the empty schema is fine. The doc comment is aspirational. Could either keep as-is and update the DmlStatement docs to note TRUNCATE is special, or use the table schema for consistency (extra code, no functional benefit). Former seems fine.

})),
output_schema: DFSchemaRef::new(DFSchema::empty()),
}))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use DmlStatement::new() rather than constructing the struct directly. The constructor sets output_schema: make_count_schema() which gives you the {count: UInt64} that DML ops return.
Also noticed this causes a proto roundtrip mismatch: encode uses the struct directly but decode uses the constructor (at mod.rs:978), so you'd get different plans before/after serialization.

Something like:

Suggested change
Ok(LogicalPlan::Dml(DmlStatement {
table_name: table.clone(),
target: source,
op: WriteOp::Truncate,
input: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
})),
output_schema: DFSchemaRef::new(DFSchema::empty()),
}))
}
Ok(LogicalPlan::Dml(DmlStatement::new(
table.clone(),
source,
WriteOp::Truncate,
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
})),
)))

Statement::CreateIndex(CreateIndex {
name,
table_name,
Expand Down
56 changes: 56 additions & 0 deletions datafusion/sqllogictest/test_files/truncate.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

##########
## Truncate Tests
##########

statement ok
create table t1(a int, b varchar, c double, d int);

statement ok
insert into t1 values (1, 'abc', 3.14, 4), (2, 'def', 2.71, 5);

# Truncate all rows from table
query TT
explain truncate table t1;
----
logical_plan
01)Dml: op=[Truncate] table=[t1]
02)--EmptyRelation: rows=0
physical_plan_error
01)TRUNCATE operation on table 't1'
02)caused by
03)This feature is not implemented: TRUNCATE not supported for Base

# Test TRUNCATE with fully qualified table name
statement ok
create schema test_schema;

statement ok
create table test_schema.t5(a int);

query TT
explain truncate table test_schema.t5;
----
logical_plan
01)Dml: op=[Truncate] table=[test_schema.t5]
02)--EmptyRelation: rows=0
physical_plan_error
01)TRUNCATE operation on table 'test_schema.t5'
02)caused by
03)This feature is not implemented: TRUNCATE not supported for Base