Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def __init__(self, df: DataFrameInternal) -> None:
"""
self.df = df

def into_view(self) -> Table:
def into_view(self, temporary: bool = False) -> Table:
"""Convert ``DataFrame`` into a :class:`~datafusion.Table`.

Examples:
Expand All @@ -329,7 +329,7 @@ def into_view(self) -> Table:
"""
from datafusion.catalog import Table as _Table

return _Table(self.df.into_view())
return _Table(self.df.into_view(temporary))

def __getitem__(self, key: str | list[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or columns.
Expand Down
10 changes: 8 additions & 2 deletions python/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,16 @@ def test_register_table_from_dataframe(ctx):
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]


def test_register_table_from_dataframe_into_view(ctx):
@pytest.mark.parametrize("temporary", [True, False])
def test_register_table_from_dataframe_into_view(ctx, temporary):
df = ctx.from_pydict({"a": [1, 2]})
table = df.into_view()
table = df.into_view(temporary=temporary)
assert isinstance(table, Table)
if temporary:
assert table.kind == "temporary"
else:
assert table.kind == "view"

ctx.register_table("view_tbl", table)
result = ctx.sql("SELECT * FROM view_tbl").collect()
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
Expand Down
17 changes: 11 additions & 6 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::pyarrow::FromPyArrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
use datafusion::catalog::TableProvider;
use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
Expand All @@ -47,7 +48,7 @@ use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
use crate::table::PyTable;
use crate::table::{PyTable, TempViewTable};
use crate::utils::{
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, validate_pycapsule, wait_for_future,
};
Expand Down Expand Up @@ -418,11 +419,15 @@ impl PyDataFrame {
/// because we're working with Python bindings
/// where objects are shared
#[allow(clippy::wrong_self_convention)]
pub fn into_view(&self) -> PyDataFusionResult<PyTable> {
// Call the underlying Rust DataFrame::into_view method.
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
// so that we don't invalidate this PyDataFrame.
let table_provider = self.df.as_ref().clone().into_view();
pub fn into_view(&self, temporary: bool) -> PyDataFusionResult<PyTable> {
let table_provider = if temporary {
Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc<dyn TableProvider>
} else {
// Call the underlying Rust DataFrame::into_view method.
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
// so that we don't invalidate this PyDataFrame.
self.df.as_ref().clone().into_view()
};
Ok(PyTable::from(table_provider))
}

Expand Down
94 changes: 90 additions & 4 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::utils::table_provider_from_pycapsule;
use arrow::datatypes::SchemaRef;
use arrow::pyarrow::ToPyArrow;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::Column;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::DataFrame;
use pyo3::prelude::*;
use std::any::Any;
use std::sync::Arc;
Comment on lines +18 to 32
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The import statements have been reorganized but are not in alphabetical order. Consider grouping imports by crate (std, external crates, internal crates) and sorting alphabetically within each group for better maintainability.

Copilot uses AI. Check for mistakes.


use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::utils::table_provider_from_pycapsule;

/// This struct is used as a common method for all TableProviders,
/// whether they refer to an FFI provider, an internally known
/// implementation, a dataset, or a dataframe view.
Expand Down Expand Up @@ -104,3 +111,82 @@ impl From<Arc<dyn TableProvider>> for PyTable {
Self { table }
}
}

#[derive(Clone, Debug)]
pub(crate) struct TempViewTable {
df: Arc<DataFrame>,
}

/// This is nearly identical to `DataFrameTableProvider`
/// except that it is for temporary tables.
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions 'temporary tables' but the struct is for temporary views. Consider updating the comment to say 'temporary views' for accuracy.

Suggested change
/// except that it is for temporary tables.
/// except that it is for temporary views.

Copilot uses AI. Check for mistakes.

/// Remove when https://github.com/apache/datafusion/issues/18026
/// closes.
impl TempViewTable {
pub(crate) fn new(df: Arc<DataFrame>) -> Self {
Self { df }
}
}

#[async_trait]
impl TableProvider for TempViewTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::new(self.df.schema().into())
}

fn table_type(&self) -> TableType {
TableType::Temporary
}

async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
let plan = self.df.logical_plan().clone();
let mut plan = LogicalPlanBuilder::from(plan);

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

let mut plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
plan
} else {
let fields: Vec<Expr> = projection
.iter()
.map(|i| {
Expr::Column(Column::from(
self.df.logical_plan().schema().qualified_field(*i),
))
})
.collect();
plan.project(fields)?
}
} else {
plan
};

if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
}

state.create_physical_plan(&plan.build()?).await
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}