diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 1f223852c2b9d..c10c0e26203bc 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send { ) -> Result> { not_impl_err!("UPDATE not supported for {} table", self.table_type()) } + + /// Remove all rows from the table. + /// + /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64), + /// representing the number of rows removed. + async fn truncate(&self, _state: &dyn Session) -> Result> { + not_impl_err!("TRUNCATE not supported for {} table", self.table_type()) + } } /// Arguments for scanning a table with [`TableProvider::scan_with_args`]. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..7ca6636d6ac47 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -655,6 +655,30 @@ impl DefaultPhysicalPlanner { ); } } + LogicalPlan::Dml(DmlStatement { + table_name, + target, + op: WriteOp::Truncate, + .. + }) => { + if let Some(provider) = + target.as_any().downcast_ref::() + { + provider + .table_provider + .truncate(session_state) + .await + .map_err(|e| { + e.context(format!( + "TRUNCATE operation on table '{table_name}'" + )) + })? + } else { + return exec_err!( + "Table source can't be downcasted to DefaultTableSource" + ); + } + } LogicalPlan::Window(Window { window_expr, .. }) => { assert_or_internal_err!( !window_expr.is_empty(), diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 84cf97710a902..a4033e445c213 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Tests for DELETE and UPDATE planning to verify filter and assignment extraction. +//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction. use std::any::Any; use std::sync::{Arc, Mutex}; @@ -24,9 +24,10 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; -use datafusion::execution::context::SessionContext; +use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::logical_expr::Expr; use datafusion_catalog::Session; +use datafusion_common::ScalarValue; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -165,6 +166,66 @@ impl TableProvider for CaptureUpdateProvider { } } +/// A TableProvider that captures whether truncate() was called. +struct CaptureTruncateProvider { + schema: SchemaRef, + truncate_called: Arc>, +} + +impl CaptureTruncateProvider { + fn new(schema: SchemaRef) -> Self { + Self { + schema, + truncate_called: Arc::new(Mutex::new(false)), + } + } + + fn was_truncated(&self) -> bool { + *self.truncate_called.lock().unwrap() + } +} + +impl std::fmt::Debug for CaptureTruncateProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CaptureTruncateProvider") + .field("schema", &self.schema) + .finish() + } +} + +#[async_trait] +impl TableProvider for CaptureTruncateProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema)))) + } + + async fn truncate(&self, _state: &dyn Session) -> Result> { + *self.truncate_called.lock().unwrap() = true; + + Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![ + Field::new("count", DataType::UInt64, false), + ]))))) + } +} + fn test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -269,6 +330,28 @@ async fn test_update_assignments() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_truncate_calls_provider() -> Result<()> { + let provider = Arc::new(CaptureTruncateProvider::new(test_schema())); + let config = SessionConfig::new().set( + "datafusion.optimizer.max_passes", + &ScalarValue::UInt64(Some(0)), + ); + + let ctx = SessionContext::new_with_config(config); + + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + ctx.sql("TRUNCATE TABLE t").await?.collect().await?; + + assert!( + provider.was_truncated(), + "truncate() should be called on the TableProvider" + ); + + Ok(()) +} + #[tokio::test] async fn test_unsupported_table_delete() -> Result<()> { let schema = test_schema(); @@ -295,3 +378,18 @@ async fn test_unsupported_table_update() -> Result<()> { assert!(result.is_err() || result.unwrap().collect().await.is_err()); Ok(()) } + +#[tokio::test] +async fn test_unsupported_table_truncate() -> Result<()> { + let schema = test_schema(); + let ctx = SessionContext::new(); + + let empty_table = datafusion::datasource::empty::EmptyTable::new(schema); + ctx.register_table("empty_t", Arc::new(empty_table))?; + + let result = ctx.sql("TRUNCATE TABLE empty_t").await; + + assert!(result.is_err() || result.unwrap().collect().await.is_err()); + + Ok(()) +} diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 6ac3b309aa0c7..b668cbfe2cc35 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -237,6 +237,8 @@ pub enum WriteOp { Update, /// `CREATE TABLE AS SELECT` operation Ctas, + /// `TRUNCATE` operation + Truncate, } impl WriteOp { @@ -247,6 +249,7 @@ impl WriteOp { WriteOp::Delete => "Delete", WriteOp::Update => "Update", WriteOp::Ctas => "Ctas", + WriteOp::Truncate => "Truncate", } } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bd7dd3a6aff3c..4b5f349ee1d09 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -278,6 +278,7 @@ message DmlNode{ INSERT_APPEND = 3; INSERT_OVERWRITE = 4; INSERT_REPLACE = 5; + TRUNCATE = 6; } Type dml_type = 1; LogicalPlanNode input = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e269606d163a3..de2851b712ef8 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5236,6 +5236,7 @@ impl serde::Serialize for dml_node::Type { Self::InsertAppend => "INSERT_APPEND", Self::InsertOverwrite => "INSERT_OVERWRITE", Self::InsertReplace => "INSERT_REPLACE", + Self::Truncate => "TRUNCATE", }; serializer.serialize_str(variant) } @@ -5253,6 +5254,7 @@ impl<'de> serde::Deserialize<'de> for dml_node::Type { "INSERT_APPEND", "INSERT_OVERWRITE", "INSERT_REPLACE", + "TRUNCATE", ]; struct GeneratedVisitor; @@ -5299,6 +5301,7 @@ impl<'de> serde::Deserialize<'de> for dml_node::Type { "INSERT_APPEND" => Ok(dml_node::Type::InsertAppend), "INSERT_OVERWRITE" => Ok(dml_node::Type::InsertOverwrite), "INSERT_REPLACE" => Ok(dml_node::Type::InsertReplace), + "TRUNCATE" => Ok(dml_node::Type::Truncate), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cf343e0258d0b..ab88067cc13ef 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -444,6 +444,7 @@ pub mod dml_node { InsertAppend = 3, InsertOverwrite = 4, InsertReplace = 5, + Truncate = 6, } impl Type { /// String value of the enum field names used in the ProtoBuf definition. @@ -458,6 +459,7 @@ pub mod dml_node { Self::InsertAppend => "INSERT_APPEND", Self::InsertOverwrite => "INSERT_OVERWRITE", Self::InsertReplace => "INSERT_REPLACE", + Self::Truncate => "TRUNCATE", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -469,6 +471,7 @@ pub mod dml_node { "INSERT_APPEND" => Some(Self::InsertAppend), "INSERT_OVERWRITE" => Some(Self::InsertOverwrite), "INSERT_REPLACE" => Some(Self::InsertReplace), + "TRUNCATE" => Some(Self::Truncate), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 179fe8bb7d7fe..a653f517b7275 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -239,6 +239,7 @@ impl From for WriteOp { } protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace), protobuf::dml_node::Type::Ctas => WriteOp::Ctas, + protobuf::dml_node::Type::Truncate => WriteOp::Truncate, } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 6e4e5d0b6eea4..b5f2e823cbcc7 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -728,6 +728,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type { WriteOp::Delete => protobuf::dml_node::Type::Delete, WriteOp::Update => protobuf::dml_node::Type::Update, WriteOp::Ctas => protobuf::dml_node::Type::Ctas, + WriteOp::Truncate => protobuf::dml_node::Type::Truncate, } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index bcfda648b53e5..ae84d385cf321 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -413,6 +413,7 @@ async fn roundtrip_logical_plan_dml() -> Result<()> { "DELETE FROM T1", "UPDATE T1 SET a = 1", "CREATE TABLE T2 AS SELECT * FROM T1", + "TRUNCATE TABLE T1", ]; for query in queries { let plan = ctx.sql(query).await?.into_optimized_plan()?; diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 1acbcc92dfe19..2c4a52a275797 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1362,6 +1362,53 @@ impl SqlToRel<'_, S> { exec_err!("Function name not provided") } } + Statement::Truncate { + table_names, + partitions, + identity, + cascade, + on_cluster, + .. + } => { + if table_names.len() != 1 { + return not_impl_err!( + "TRUNCATE with multiple tables is not supported" + ); + } + + let target = &table_names[0]; + if target.only { + return not_impl_err!("TRUNCATE with ONLY is not supported"); + } + if partitions.is_some() { + return not_impl_err!("TRUNCATE with PARTITION is not supported"); + } + if identity.is_some() { + return not_impl_err!( + "TRUNCATE with RESTART/CONTINUE IDENTITY is not supported" + ); + } + if cascade.is_some() { + return not_impl_err!( + "TRUNCATE with CASCADE/RESTRICT is not supported" + ); + } + if on_cluster.is_some() { + return not_impl_err!("TRUNCATE with ON CLUSTER is not supported"); + } + let table = self.object_name_to_table_reference(target.name.clone())?; + let source = self.context_provider.get_table_source(table.clone())?; + + Ok(LogicalPlan::Dml(DmlStatement::new( + table.clone(), + source, + WriteOp::Truncate, + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: DFSchemaRef::new(DFSchema::empty()), + })), + ))) + } Statement::CreateIndex(CreateIndex { name, table_name, diff --git a/datafusion/sqllogictest/test_files/truncate.slt b/datafusion/sqllogictest/test_files/truncate.slt new file mode 100644 index 0000000000000..a33c74ace5be3 --- /dev/null +++ b/datafusion/sqllogictest/test_files/truncate.slt @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Truncate Tests +########## + +statement ok +create table t1(a int, b varchar, c double, d int); + +statement ok +insert into t1 values (1, 'abc', 3.14, 4), (2, 'def', 2.71, 5); + +# Truncate all rows from table +query TT +explain truncate table t1; +---- +logical_plan +01)Dml: op=[Truncate] table=[t1] +02)--EmptyRelation: rows=0 +physical_plan_error +01)TRUNCATE operation on table 't1' +02)caused by +03)This feature is not implemented: TRUNCATE not supported for Base table + +# Test TRUNCATE with fully qualified table name +statement ok +create schema test_schema; + +statement ok +create table test_schema.t5(a int); + +query TT +explain truncate table test_schema.t5; +---- +logical_plan +01)Dml: op=[Truncate] table=[test_schema.t5] +02)--EmptyRelation: rows=0 +physical_plan_error +01)TRUNCATE operation on table 'test_schema.t5' +02)caused by +03)This feature is not implemented: TRUNCATE not supported for Base table + +# Test TRUNCATE with CASCADE option +statement error TRUNCATE with CASCADE/RESTRICT is not supported +TRUNCATE TABLE t1 CASCADE; + +# Test TRUNCATE with multiple tables +statement error TRUNCATE with multiple tables is not supported +TRUNCATE TABLE t1, t2; + +statement error TRUNCATE with PARTITION is not supported +TRUNCATE TABLE t1 PARTITION (p1); + +statement error TRUNCATE with ONLY is not supported +TRUNCATE ONLY t1; + +statement error TRUNCATE with RESTART/CONTINUE IDENTITY is not supported +TRUNCATE TABLE t1 RESTART IDENTITY; \ No newline at end of file