@@ -9,21 +9,31 @@ use pyo3::prelude::*;
99
1010use crate :: { error:: PythonError , utils:: rt, RawDeltaTable } ;
1111
12+ /// PyQueryBuilder supports the _experimental_ `QueryBuilder` Pythoh interface which allows users
13+ /// to take advantage of the [Apache DataFusion](https://datafusion.apache.org) engine already
14+ /// present in the Python package.
1215#[ pyclass( module = "deltalake._internal" ) ]
16+ #[ derive( Default ) ]
1317pub ( crate ) struct PyQueryBuilder {
14- _ctx : SessionContext ,
18+ /// DataFusion [SessionContext] to hold mappings of registered tables
19+ ctx : SessionContext ,
1520}
1621
1722#[ pymethods]
1823impl PyQueryBuilder {
1924 #[ new]
2025 pub fn new ( ) -> Self {
2126 let config = DeltaSessionConfig :: default ( ) . into ( ) ;
22- let _ctx = SessionContext :: new_with_config ( config) ;
27+ let ctx = SessionContext :: new_with_config ( config) ;
2328
24- PyQueryBuilder { _ctx }
29+ PyQueryBuilder { ctx }
2530 }
2631
32+ /// Register the given [RawDeltaTable] into the [SessionContext] using the provided
33+ /// `table_name`
34+ ///
35+ /// Once called, the provided `delta_table` will be referencable in SQL queries so long as
36+ /// another table of the same name is not registered over it.
2737 pub fn register ( & self , table_name : & str , delta_table : & RawDeltaTable ) -> PyResult < ( ) > {
2838 let snapshot = delta_table. _table . snapshot ( ) . map_err ( PythonError :: from) ?;
2939 let log_store = delta_table. _table . log_store ( ) ;
@@ -37,17 +47,22 @@ impl PyQueryBuilder {
3747 . map_err ( PythonError :: from) ?,
3848 ) ;
3949
40- self . _ctx
50+ self . ctx
4151 . register_table ( table_name, provider)
4252 . map_err ( PythonError :: from) ?;
4353
4454 Ok ( ( ) )
4555 }
4656
57+ /// Execute the given SQL command within the [SessionContext] of this instance
58+ ///
59+ /// **NOTE:** Since this function returns a materialized Python list of `RecordBatch`
60+ /// instances, it may result unexpected memory consumption for queries which return large data
61+ /// sets.
4762 pub fn execute ( & self , py : Python , sql : & str ) -> PyResult < PyObject > {
4863 let batches = py. allow_threads ( || {
4964 rt ( ) . block_on ( async {
50- let df = self . _ctx . sql ( sql) . await ?;
65+ let df = self . ctx . sql ( sql) . await ?;
5166 df. collect ( ) . await
5267 } )
5368 . map_err ( PythonError :: from)
0 commit comments