From 94b6f55cc66ec25e4248c693cfa3943eff5d3bb9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 12 Oct 2025 09:07:17 -0400 Subject: [PATCH] Add temporary view option for into_view --- python/datafusion/dataframe.py | 4 +- python/tests/test_context.py | 10 +++- src/dataframe.rs | 17 +++--- src/table.rs | 94 ++++++++++++++++++++++++++++++++-- 4 files changed, 111 insertions(+), 14 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 5a21d773b..61b033d39 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -315,7 +315,7 @@ def __init__(self, df: DataFrameInternal) -> None: """ self.df = df - def into_view(self) -> Table: + def into_view(self, temporary: bool = False) -> Table: """Convert ``DataFrame`` into a :class:`~datafusion.Table`. Examples: @@ -329,7 +329,7 @@ def into_view(self) -> Table: """ from datafusion.catalog import Table as _Table - return _Table(self.df.into_view()) + return _Table(self.df.into_view(temporary)) def __getitem__(self, key: str | list[str]) -> DataFrame: """Return a new :py:class`DataFrame` with the specified column or columns. diff --git a/python/tests/test_context.py b/python/tests/test_context.py index 94d1e6a39..bd65305ed 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -357,10 +357,16 @@ def test_register_table_from_dataframe(ctx): assert [b.to_pydict() for b in result] == [{"a": [1, 2]}] -def test_register_table_from_dataframe_into_view(ctx): +@pytest.mark.parametrize("temporary", [True, False]) +def test_register_table_from_dataframe_into_view(ctx, temporary): df = ctx.from_pydict({"a": [1, 2]}) - table = df.into_view() + table = df.into_view(temporary=temporary) assert isinstance(table, Table) + if temporary: + assert table.kind == "temporary" + else: + assert table.kind == "view" + ctx.register_table("view_tbl", table) result = ctx.sql("SELECT * FROM view_tbl").collect() assert [b.to_pydict() for b in result] == [{"a": [1, 2]}] diff --git a/src/dataframe.rs b/src/dataframe.rs index bfdc35e13..85b97eeca 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -28,6 +28,7 @@ use arrow::pyarrow::FromPyArrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; +use datafusion::catalog::TableProvider; use datafusion::common::UnnestOptions; use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; @@ -47,7 +48,7 @@ use crate::expr::sort_expr::to_sort_expressions; use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; -use crate::table::PyTable; +use crate::table::{PyTable, TempViewTable}; use crate::utils::{ get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, validate_pycapsule, wait_for_future, }; @@ -418,11 +419,15 @@ impl PyDataFrame { /// because we're working with Python bindings /// where objects are shared #[allow(clippy::wrong_self_convention)] - pub fn into_view(&self) -> PyDataFusionResult { - // Call the underlying Rust DataFrame::into_view method. - // Note that the Rust method consumes self; here we clone the inner Arc - // so that we don't invalidate this PyDataFrame. - let table_provider = self.df.as_ref().clone().into_view(); + pub fn into_view(&self, temporary: bool) -> PyDataFusionResult { + let table_provider = if temporary { + Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc + } else { + // Call the underlying Rust DataFrame::into_view method. + // Note that the Rust method consumes self; here we clone the inner Arc + // so that we don't invalidate this PyDataFrame. + self.df.as_ref().clone().into_view() + }; Ok(PyTable::from(table_provider)) } diff --git a/src/table.rs b/src/table.rs index b830f7764..fdca4d3e6 100644 --- a/src/table.rs +++ b/src/table.rs @@ -15,15 +15,22 @@ // specific language governing permissions and limitations // under the License. +use crate::dataframe::PyDataFrame; +use crate::dataset::Dataset; +use crate::utils::table_provider_from_pycapsule; +use arrow::datatypes::SchemaRef; use arrow::pyarrow::ToPyArrow; +use async_trait::async_trait; +use datafusion::catalog::Session; +use datafusion::common::Column; use datafusion::datasource::{TableProvider, TableType}; +use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::DataFrame; use pyo3::prelude::*; +use std::any::Any; use std::sync::Arc; -use crate::dataframe::PyDataFrame; -use crate::dataset::Dataset; -use crate::utils::table_provider_from_pycapsule; - /// This struct is used as a common method for all TableProviders, /// whether they refer to an FFI provider, an internally known /// implementation, a dataset, or a dataframe view. @@ -104,3 +111,82 @@ impl From> for PyTable { Self { table } } } + +#[derive(Clone, Debug)] +pub(crate) struct TempViewTable { + df: Arc, +} + +/// This is nearly identical to `DataFrameTableProvider` +/// except that it is for temporary tables. +/// Remove when https://github.com/apache/datafusion/issues/18026 +/// closes. +impl TempViewTable { + pub(crate) fn new(df: Arc) -> Self { + Self { df } + } +} + +#[async_trait] +impl TableProvider for TempViewTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::new(self.df.schema().into()) + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::common::Result> { + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + let plan = self.df.logical_plan().clone(); + let mut plan = LogicalPlanBuilder::from(plan); + + if let Some(filter) = filter { + plan = plan.filter(filter)?; + } + + let mut plan = if let Some(projection) = projection { + // avoiding adding a redundant projection (e.g. SELECT * FROM view) + let current_projection = (0..plan.schema().fields().len()).collect::>(); + if projection == ¤t_projection { + plan + } else { + let fields: Vec = projection + .iter() + .map(|i| { + Expr::Column(Column::from( + self.df.logical_plan().schema().qualified_field(*i), + )) + }) + .collect(); + plan.project(fields)? + } + } else { + plan + }; + + if let Some(limit) = limit { + plan = plan.limit(0, Some(limit))?; + } + + state.create_physical_plan(&plan.build()?).await + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result> { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } +}