@@ -8,6 +8,7 @@ mod utils;
88use std:: collections:: { HashMap , HashSet } ;
99use std:: future:: IntoFuture ;
1010use std:: str:: FromStr ;
11+ use std:: sync:: Arc ;
1112use std:: time;
1213use std:: time:: { SystemTime , UNIX_EPOCH } ;
1314
@@ -17,12 +18,18 @@ use delta_kernel::expressions::Scalar;
1718use delta_kernel:: schema:: StructField ;
1819use deltalake:: arrow:: compute:: concat_batches;
1920use deltalake:: arrow:: ffi_stream:: { ArrowArrayStreamReader , FFI_ArrowArrayStream } ;
21+ use deltalake:: arrow:: pyarrow:: ToPyArrow ;
2022use deltalake:: arrow:: record_batch:: { RecordBatch , RecordBatchIterator } ;
2123use deltalake:: arrow:: { self , datatypes:: Schema as ArrowSchema } ;
2224use deltalake:: checkpoints:: { cleanup_metadata, create_checkpoint} ;
25+ use deltalake:: datafusion:: datasource:: provider_as_source;
26+ use deltalake:: datafusion:: logical_expr:: { LogicalPlanBuilder , UNNAMED_TABLE } ;
2327use deltalake:: datafusion:: physical_plan:: ExecutionPlan ;
24- use deltalake:: datafusion:: prelude:: SessionContext ;
25- use deltalake:: delta_datafusion:: DeltaDataChecker ;
28+ use deltalake:: datafusion:: prelude:: { DataFrame , SessionContext } ;
29+ use deltalake:: delta_datafusion:: {
30+ DataFusionMixins , DeltaDataChecker , DeltaScanConfigBuilder , DeltaSessionConfig ,
31+ DeltaTableProvider ,
32+ } ;
2633use deltalake:: errors:: DeltaTableError ;
2734use deltalake:: kernel:: {
2835 scalars:: ScalarExt , Action , Add , Invariant , LogicalFile , Remove , StructType ,
@@ -1232,6 +1239,65 @@ impl RawDeltaTable {
12321239 self . _table . state = table. state ;
12331240 Ok ( serde_json:: to_string ( & metrics) . unwrap ( ) )
12341241 }
1242+
1243+ #[ pyo3( signature = ( predicate = None , columns = None ) ) ]
1244+ pub fn datafusion_read (
1245+ & self ,
1246+ py : Python ,
1247+ predicate : Option < String > ,
1248+ columns : Option < Vec < String > > ,
1249+ ) -> PyResult < PyObject > {
1250+ let batches = py. allow_threads ( || -> PyResult < _ > {
1251+ let snapshot = self . _table . snapshot ( ) . map_err ( PythonError :: from) ?;
1252+ let log_store = self . _table . log_store ( ) ;
1253+
1254+ let scan_config = DeltaScanConfigBuilder :: default ( )
1255+ . with_parquet_pushdown ( false )
1256+ . build ( snapshot)
1257+ . map_err ( PythonError :: from) ?;
1258+
1259+ let provider = Arc :: new (
1260+ DeltaTableProvider :: try_new ( snapshot. clone ( ) , log_store, scan_config)
1261+ . map_err ( PythonError :: from) ?,
1262+ ) ;
1263+ let source = provider_as_source ( provider) ;
1264+
1265+ let config = DeltaSessionConfig :: default ( ) . into ( ) ;
1266+ let session = SessionContext :: new_with_config ( config) ;
1267+ let state = session. state ( ) ;
1268+
1269+ let maybe_filter = predicate
1270+ . map ( |predicate| snapshot. parse_predicate_expression ( predicate, & state) )
1271+ . transpose ( )
1272+ . map_err ( PythonError :: from) ?;
1273+
1274+ let filters = match & maybe_filter {
1275+ Some ( filter) => vec ! [ filter. clone( ) ] ,
1276+ None => vec ! [ ] ,
1277+ } ;
1278+
1279+ let plan = LogicalPlanBuilder :: scan_with_filters ( UNNAMED_TABLE , source, None , filters)
1280+ . unwrap ( )
1281+ . build ( )
1282+ . unwrap ( ) ;
1283+
1284+ let mut df = DataFrame :: new ( state, plan) ;
1285+
1286+ if let Some ( filter) = maybe_filter {
1287+ df = df. filter ( filter) . unwrap ( ) ;
1288+ }
1289+
1290+ if let Some ( columns) = columns {
1291+ df = df
1292+ . select_columns ( & columns. iter ( ) . map ( String :: as_str) . collect :: < Vec < _ > > ( ) )
1293+ . unwrap ( ) ;
1294+ }
1295+
1296+ Ok ( rt ( ) . block_on ( async { df. collect ( ) . await } ) . unwrap ( ) )
1297+ } ) ?;
1298+
1299+ batches. to_pyarrow ( py)
1300+ }
12351301}
12361302
12371303fn set_post_commithook_properties (
0 commit comments