Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("UPDATE not supported for {} table", self.table_type())
}

/// Remove all rows from the table.
///
/// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64),
/// representing the number of rows removed.
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
}
}

/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ impl DefaultPhysicalPlanner {
);
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Truncate,
..
}) => {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
provider
.table_provider
.truncate(session_state)
.await
.map_err(|e| {
e.context(format!(
"TRUNCATE operation on table '{table_name}'"
))
})?
} else {
return exec_err!(
"Table source can't be downcasted to DefaultTableSource"
);
}
}
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_or_internal_err!(
!window_expr.is_empty(),
Expand Down
102 changes: 100 additions & 2 deletions datafusion/core/tests/custom_sources_cases/dml_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Tests for DELETE and UPDATE planning to verify filter and assignment extraction.
//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction.

use std::any::Any;
use std::sync::{Arc, Mutex};
Expand All @@ -24,9 +24,10 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::logical_expr::Expr;
use datafusion_catalog::Session;
use datafusion_common::ScalarValue;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::empty::EmptyExec;

Expand Down Expand Up @@ -165,6 +166,66 @@ impl TableProvider for CaptureUpdateProvider {
}
}

/// A TableProvider that captures whether truncate() was called.
struct CaptureTruncateProvider {
schema: SchemaRef,
truncate_called: Arc<Mutex<bool>>,
}

impl CaptureTruncateProvider {
fn new(schema: SchemaRef) -> Self {
Self {
schema,
truncate_called: Arc::new(Mutex::new(false)),
}
}

fn was_truncated(&self) -> bool {
*self.truncate_called.lock().unwrap()
}
}

impl std::fmt::Debug for CaptureTruncateProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CaptureTruncateProvider")
.field("schema", &self.schema)
.finish()
}
}

#[async_trait]
impl TableProvider for CaptureTruncateProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema))))
}

async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
*self.truncate_called.lock().unwrap() = true;

Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
])))))
}
}

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Expand Down Expand Up @@ -269,6 +330,28 @@ async fn test_update_assignments() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_truncate_calls_provider() -> Result<()> {
let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
let config = SessionConfig::new().set(
"datafusion.optimizer.max_passes",
&ScalarValue::UInt64(Some(0)),
);

let ctx = SessionContext::new_with_config(config);

ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;

ctx.sql("TRUNCATE TABLE t").await?.collect().await?;

assert!(
provider.was_truncated(),
"truncate() should be called on the TableProvider"
);

Ok(())
}

#[tokio::test]
async fn test_unsupported_table_delete() -> Result<()> {
let schema = test_schema();
Expand All @@ -295,3 +378,18 @@ async fn test_unsupported_table_update() -> Result<()> {
assert!(result.is_err() || result.unwrap().collect().await.is_err());
Ok(())
}

#[tokio::test]
async fn test_unsupported_table_truncate() -> Result<()> {
let schema = test_schema();
let ctx = SessionContext::new();

let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
ctx.register_table("empty_t", Arc::new(empty_table))?;

let result = ctx.sql("TRUNCATE TABLE empty_t").await;

assert!(result.is_err() || result.unwrap().collect().await.is_err());

Ok(())
}
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ pub enum WriteOp {
Update,
/// `CREATE TABLE AS SELECT` operation
Ctas,
/// `TRUNCATE` operation
Truncate,
}

impl WriteOp {
Expand All @@ -247,6 +249,7 @@ impl WriteOp {
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
WriteOp::Truncate => "Truncate",
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ message DmlNode{
INSERT_APPEND = 3;
INSERT_OVERWRITE = 4;
INSERT_REPLACE = 5;
TRUNCATE = 6;
}
Type dml_type = 1;
LogicalPlanNode input = 2;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl From<protobuf::dml_node::Type> for WriteOp {
}
protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type {
WriteOp::Delete => protobuf::dml_node::Type::Delete,
WriteOp::Update => protobuf::dml_node::Type::Update,
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added the proto enum and conversions and worth adding a roundtrip test to lock it in.

In datafusion/proto/tests/cases/roundtrip_logical_plan.rs, the roundtrip_logical_plan_dml test covers INSERT, DELETE, UPDATE, CTAS.

Adding TRUNCATE there would catch any future regressions:

// In the queries array:
"TRUNCATE TABLE test_table",

}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ async fn roundtrip_logical_plan_dml() -> Result<()> {
"DELETE FROM T1",
"UPDATE T1 SET a = 1",
"CREATE TABLE T2 AS SELECT * FROM T1",
"TRUNCATE TABLE T1",
];
for query in queries {
let plan = ctx.sql(query).await?.into_optimized_plan()?;
Expand Down
47 changes: 47 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,53 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
exec_err!("Function name not provided")
}
}
Statement::Truncate {
table_names,
partitions,
identity,
cascade,
on_cluster,
..
} => {
if table_names.len() != 1 {
return not_impl_err!(
"TRUNCATE with multiple tables is not supported"
);
}

let target = &table_names[0];
if target.only {
return not_impl_err!("TRUNCATE with ONLY is not supported");
}
if partitions.is_some() {
return not_impl_err!("TRUNCATE with PARTITION is not supported");
}
if identity.is_some() {
return not_impl_err!(
"TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
);
}
if cascade.is_some() {
return not_impl_err!(
"TRUNCATE with CASCADE/RESTRICT is not supported"
);
}
if on_cluster.is_some() {
return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
}
let table = self.object_name_to_table_reference(target.name.clone())?;
let source = self.context_provider.get_table_source(table.clone())?;

Ok(LogicalPlan::Dml(DmlStatement::new(
table.clone(),
source,
WriteOp::Truncate,
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
says the schema must match with table_schema. Is this important here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike DELETE or UPDATE, TRUNCATE is a table-level operation with no filters, assignments. The input relation is only a placeholder to satisfy the DML plan shape and is never inspected or executed for row data. Because no rows or columns are read, the usual “input schema must match table schema” invariant does not apply here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this and you're right that TRUNCATE doesn't use the input.

Physical planner shows:

  • DELETE/UPDATE: extract filters/assignments from input
  • TRUNCATE: ignores input entirely, just calls provider.truncate()

So functionally the empty schema is fine. The doc comment is aspirational. Could either keep as-is and update the DmlStatement docs to note TRUNCATE is special, or use the table schema for consistency (extra code, no functional benefit). Former seems fine.

})),
)))
}
Statement::CreateIndex(CreateIndex {
name,
table_name,
Expand Down
Loading