Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9d589a2
add test_view
kosiew Feb 6, 2025
648c185
feat: add into_view method to register DataFrame as a view
kosiew Feb 6, 2025
e55ac9f
add pytableprovider
kosiew Feb 6, 2025
ca42449
feat: add as_table method to PyTableProvider and update into_view to …
kosiew Feb 6, 2025
d0c3163
refactor: simplify as_table method and update documentation for into_…
kosiew Feb 6, 2025
8578713
test: improve test_register_filtered_dataframe by removing redundant …
kosiew Feb 6, 2025
9cdd0dc
test: enhance test_register_filtered_dataframe with additional assert…
kosiew Feb 6, 2025
c207b6c
ruff formatted
kosiew Feb 6, 2025
20dbfe8
cleanup: remove unused imports from test_view.py
kosiew Feb 6, 2025
4b4c641
docs: add example for registering a DataFrame as a view in README.md
kosiew Feb 7, 2025
12c4fe3
docs: update docstring for into_view method to clarify usage as ViewT…
kosiew Feb 7, 2025
15ead1f
chore: add license header to test_view.py
kosiew Feb 7, 2025
48eb8db
ruff correction
kosiew Feb 7, 2025
f73eebb
refactor: rename into_view method to _into_view
kosiew Feb 7, 2025
6bba2e2
ruff lint
kosiew Feb 12, 2025
7b0cbf1
refactor: simplify into_view method and update Rust binding convention
kosiew Feb 18, 2025
f594b46
docs: add views section to user guide with example on registering views
kosiew Feb 18, 2025
90a6a8b
feat: add register_view method to SessionContext for DataFrame regist…
kosiew Mar 7, 2025
c31395f
Merge branch 'main' into view
kosiew Mar 7, 2025
f0837de
docs: update README and user guide to reflect register_view method fo…
kosiew Mar 7, 2025
9d8cdb5
docs: remove some documentation from PyDataFrame
kosiew Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,49 @@ This produces the following chart:

![Chart](examples/chart.png)

## Registering a DataFrame as a View

You can use the `into_view` method to convert a DataFrame into a view and register it with the context.

```python
from datafusion import SessionContext, col, literal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very good, but would be more helpful if moved into the appropriate docs section so it goes into the online documentation rather than the readme.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a view.rst for this.

# Create a DataFusion context
ctx = SessionContext()

# Create sample data
data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}

# Create a DataFrame from the dictionary
df = ctx.from_pydict(data, "my_table")

# Filter the DataFrame (for example, keep rows where a > 2)
df_filtered = df.filter(col("a") > literal(2))

# Convert the filtered DataFrame into a view
view = df_filtered.into_view()

# Register the view with the context
ctx.register_table("view1", view)

# Now run a SQL query against the registered view
df_view = ctx.sql("SELECT * FROM view1")

# Collect the results
results = df_view.collect()

# Convert results to a list of dictionaries for display
result_dicts = [batch.to_pydict() for batch in results]

print(result_dicts)
```

This will output:

```python
[{'a': [3, 4, 5], 'b': [30, 40, 50]}]
```

## Configuration

It is possible to configure runtime (memory and disk settings) and configuration settings when creating a context.
Expand Down
8 changes: 8 additions & 0 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ def __init__(self, df: DataFrameInternal) -> None:
"""
self.df = df

def into_view(self) -> pa.Table:
"""Convert DataFrame as a ViewTable which can be used in register_table."""
return self._into_view()

def _into_view(self) -> pa.Table:
"""Convert DataFrame as a ViewTable which can be used in register_table."""
return self.df._into_view()

def __getitem__(self, key: str | List[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or columns.

Expand Down
52 changes: 52 additions & 0 deletions python/tests/test_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from datafusion import SessionContext, col, literal


def test_register_filtered_dataframe():
ctx = SessionContext()

data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}

df = ctx.from_pydict(data, "my_table")

df_filtered = df.filter(col("a") > literal(2))
view = df_filtered.into_view()

assert view.kind == "view"

ctx.register_table("view1", view)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is modelled after how into_view is used in datafusion:

async fn with_column_renamed_ambiguous() -> Result<()> {
    let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
    let ctx = SessionContext::new();

    let table = df.into_view();
    ctx.register_table("t1", table.clone())?;
    ctx.register_table("t2", table)?;


df_view = ctx.sql("SELECT * FROM view1")

filtered_results = df_view.collect()

result_dicts = [batch.to_pydict() for batch in filtered_results]

expected_results = [{"a": [3, 4, 5], "b": [30, 40, 50]}]

assert result_dicts == expected_results

df_results = df.collect()

df_result_dicts = [batch.to_pydict() for batch in df_results]

expected_df_results = [{"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}]

assert df_result_dicts == expected_df_results
32 changes: 32 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::arrow::util::pretty;
use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::datasource::TableProvider;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::prelude::*;
Expand All @@ -39,6 +40,7 @@ use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyTuple, PyTupleMethods};
use tokio::task::JoinHandle;

use crate::catalog::PyTable;
use crate::errors::{py_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
Expand All @@ -50,6 +52,22 @@ use crate::{
expr::{sort_expr::PySortExpr, PyExpr},
};

#[pyclass(name = "TableProvider", module = "datafusion")]
pub struct PyTableProvider {
provider: Arc<dyn TableProvider>,
}

impl PyTableProvider {
pub fn new(provider: Arc<dyn TableProvider>) -> Self {
Self { provider }
}

pub fn as_table(&self) -> PyTable {
let table_provider: Arc<dyn TableProvider> = self.provider.clone();
PyTable::new(table_provider)
}
}

Comment on lines +58 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think this is a good idea, but I'm worried about causing confusion with a table provider created from a view and a table provider that is passed from an external source using pycapsule. I can imagine a user would think that a table provider object from one place can be used with another. That is, if I create a table provider with into_view I should be able to register it with the session context. Now, I don't think that operation is strictly necssary but I do expect it would cause some confusion.

What I think we want to do is to have a single common PyTableProvider that can be created either via a pycapsule or into_view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a single common PyTableProvider that can be created either via a pycapsule or into_view.

Do you mean a constructor that takes a pycapsule argument, then extract provider to use in
PyTableProvider::new(provider)?

Can I check how I can obtain the provider from pub struct PyCapsule(PyAny)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timsaucer

Any chance you can give me some code points or reference PRs that would help with implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just skip the whole creating a view as a table provider and instead go straight to registering a view on the session context?

We could do something like register_view(df: DataFrame) which would under the hood do exactly what you've got except not expose it back as a PyTableProvider and eliminate any possible confusion. Then we'd also save the user a step.

@matko would that solve your needs or do you need that view table provider exposed for other use?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise I think we have to plan for how we can have a common concept around two ways of creating table providers in python code. Also we would want to think about how we would handle the return type of a udtf, which we haven't even addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip the whole creating a view as a table provider and instead go straight to registering a view on the session context?

Sounds good.
Implemented.

/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
/// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment.
Expand Down Expand Up @@ -156,6 +174,20 @@ impl PyDataFrame {
PyArrowType(self.df.schema().into())
}

/// Convert this DataFrame into a Table that can be used in register_table
fn _into_view(&self) -> PyDataFusionResult<PyTable> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend disabling that specific clippy warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Call the underlying Rust DataFrame::into_view method.
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
// so that we don’t invalidate this PyDataFrame.
// _into_view because clippy says `into_*` usually take `self` by value
// but we cannot own self because Python objects are shared,
// so 'self' cannot be moved out of the Python interpreter
let table_provider = self.df.as_ref().clone().into_view();
let table_provider = PyTableProvider::new(table_provider);

Ok(table_provider.as_table())
}

#[pyo3(signature = (*args))]
fn select_columns(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
Expand Down