@@ -72,83 +72,32 @@ use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7272use pyo3:: types:: { PyCapsule , PyDict , PyList , PyTuple , PyType } ;
7373use tokio:: task:: JoinHandle ;
7474
75- /// Display configuration for DataFrames
76- #[ pyclass( name = "DisplayConfig" , module = "datafusion" , subclass) ]
77- #[ derive( Clone , Debug ) ]
78- pub struct DisplayConfig {
79- #[ pyo3( get, set) ]
80- pub max_width : usize ,
81- #[ pyo3( get, set) ]
82- pub max_rows : Option < usize > ,
83- #[ pyo3( get, set) ]
84- pub show_nulls : bool ,
85- }
86-
87- #[ pymethods]
88- impl DisplayConfig {
89- #[ new]
90- pub fn new (
91- max_width : Option < usize > ,
92- max_rows : Option < usize > ,
93- show_nulls : Option < bool > ,
94- ) -> Self {
95- Self {
96- max_width : max_width. unwrap_or ( 80 ) ,
97- max_rows,
98- show_nulls : show_nulls. unwrap_or ( false ) ,
99- }
100- }
101- }
102-
10375/// Configuration options for a SessionContext
10476#[ pyclass( name = "SessionConfig" , module = "datafusion" , subclass) ]
10577#[ derive( Clone , Default ) ]
10678pub struct PySessionConfig {
10779 pub config : SessionConfig ,
108- pub display_config : DisplayConfig ,
10980}
11081
11182impl From < SessionConfig > for PySessionConfig {
11283 fn from ( config : SessionConfig ) -> Self {
113- Self {
114- config,
115- display_config : DisplayConfig :: new ( Some ( 80 ) , None , Some ( false ) ) ,
116- }
84+ Self { config }
11785 }
11886}
11987
12088#[ pymethods]
12189impl PySessionConfig {
122- #[ pyo3( signature = ( config_options=None , display_config= None ) ) ]
90+ #[ pyo3( signature = ( config_options=None ) ) ]
12391 #[ new]
124- fn new (
125- config_options : Option < HashMap < String , String > > ,
126- display_config : Option < DisplayConfig > ,
127- ) -> Self {
92+ fn new ( config_options : Option < HashMap < String , String > > ) -> Self {
12893 let mut config = SessionConfig :: new ( ) ;
12994 if let Some ( hash_map) = config_options {
13095 for ( k, v) in & hash_map {
13196 config = config. set ( k, & ScalarValue :: Utf8 ( Some ( v. clone ( ) ) ) ) ;
13297 }
13398 }
13499
135- Self {
136- config,
137- display_config : display_config
138- . unwrap_or_else ( || DisplayConfig :: new ( Some ( 80 ) , None , Some ( false ) ) ) ,
139- }
140- }
141-
142- // Get the display configuration
143- pub fn get_display_config ( & self ) -> DisplayConfig {
144- self . display_config . clone ( )
145- }
146-
147- // Set the display configuration
148- pub fn with_display_config ( & self , display_config : DisplayConfig ) -> Self {
149- let mut new_config = self . clone ( ) ;
150- new_config. display_config = display_config;
151- new_config
100+ Self { config }
152101 }
153102
154103 fn with_create_default_catalog_and_schema ( & self , enabled : bool ) -> Self {
@@ -726,6 +675,226 @@ impl PySessionContext {
726675 ) ) ) ;
727676 }
728677
678+ let mut options = CsvReadOptions :: new ( )
679+ . has_header ( has_header)
680+ . delimiter ( delimiter[ 0 ] )
681+ . schema_infer_max_records ( schema_infer_max_records)
682+ . file_extension ( file_extension)
683+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?) ;
684+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
685+
686+ if path. is_instance_of :: < PyList > ( ) {
687+ let paths = path. extract :: < Vec < String > > ( ) ?;
688+ let result = self . register_csv_from_multiple_paths ( name, paths, options) ;
689+ wait_for_future ( py, result) ?;
690+ } else {
691+ let path = path. extract :: < String > ( ) ?;
692+ let result = self . ctx . register_csv ( name, & path, options) ;
693+ wait_for_future ( py, result) ?;
694+ }
695+
696+ Ok ( ( ) )
697+ }
698+
699+ #[ allow( clippy:: too_many_arguments) ]
700+ #[ pyo3( signature = ( name,
701+ path,
702+ schema=None ,
703+ schema_infer_max_records=1000 ,
704+ file_extension=".json" ,
705+ table_partition_cols=vec![ ] ,
706+ file_compression_type=None ) ) ]
707+ pub fn register_json (
708+ & mut self ,
709+ name : & str ,
710+ path : PathBuf ,
711+ schema : Option < PyArrowType < Schema > > ,
712+ schema_infer_max_records : usize ,
713+ file_extension : & str ,
714+ table_partition_cols : Vec < ( String , String ) > ,
715+ file_compression_type : Option < String > ,
716+ py : Python ,
717+ ) -> PyDataFusionResult < ( ) > {
718+ let path = path
719+ . to_str ( )
720+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
721+
722+ let mut options = NdJsonReadOptions :: default ( )
723+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?)
724+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?) ;
725+ options. schema_infer_max_records = schema_infer_max_records;
726+ options. file_extension = file_extension;
727+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
728+
729+ let result = self . ctx . register_json ( name, path, options) ;
730+ wait_for_future ( py, result) ?;
731+
732+ Ok ( ( ) )
733+ }
734+
735+ #[ allow( clippy:: too_many_arguments) ]
736+ #[ pyo3( signature = ( name,
737+ path,
738+ schema=None ,
739+ file_extension=".avro" ,
740+ table_partition_cols=vec![ ] ) ) ]
741+ pub fn register_avro (
742+ & mut self ,
743+ name : & str ,
744+ path : PathBuf ,
745+ schema : Option < PyArrowType < Schema > > ,
746+ file_extension : & str ,
747+ table_partition_cols : Vec < ( String , String ) > ,
748+ py : Python ,
749+ ) -> PyDataFusionResult < ( ) > {
750+ let path = path
751+ . to_str ( )
752+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
753+
754+ let mut options = AvroReadOptions :: default ( )
755+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?) ;
756+ options. file_extension = file_extension;
757+ options. schema = schema. as_ref ( ) . map ( |x| & x. 0 ) ;
758+
759+ let result = self . ctx . register_avro ( name, path, options) ;
760+ wait_for_future ( py, result) ?;
761+
762+ Ok ( ( ) )
763+ }
764+
765+ // Registers a PyArrow.Dataset
766+ pub fn register_dataset (
767+ & self ,
768+ name : & str ,
769+ dataset : & Bound < ' _ , PyAny > ,
770+ py : Python ,
771+ ) -> PyDataFusionResult < ( ) > {
772+ let table: Arc < dyn TableProvider > = Arc :: new ( Dataset :: new ( dataset, py) ?) ;
773+
774+ self . ctx . register_table ( name, table) ?;
775+
776+ Ok ( ( ) )
777+ }
778+
779+ pub fn register_udf ( & mut self , udf : PyScalarUDF ) -> PyResult < ( ) > {
780+ self . ctx . register_udf ( udf. function ) ;
781+ Ok ( ( ) )
782+ }
783+
784+ pub fn register_udaf ( & mut self , udaf : PyAggregateUDF ) -> PyResult < ( ) > {
785+ self . ctx . register_udaf ( udaf. function ) ;
786+ Ok ( ( ) )
787+ }
788+
789+ pub fn register_udwf ( & mut self , udwf : PyWindowUDF ) -> PyResult < ( ) > {
790+ self . ctx . register_udwf ( udwf. function ) ;
791+ Ok ( ( ) )
792+ }
793+
794+ #[ pyo3( signature = ( name="datafusion" ) ) ]
795+ pub fn catalog ( & self , name : & str ) -> PyResult < PyCatalog > {
796+ match self . ctx . catalog ( name) {
797+ Some ( catalog) => Ok ( PyCatalog :: new ( catalog) ) ,
798+ None => Err ( PyKeyError :: new_err ( format ! (
799+ "Catalog with name {} doesn't exist." ,
800+ & name,
801+ ) ) ) ,
802+ }
803+ }
804+
805+ pub fn tables ( & self ) -> HashSet < String > {
806+ self . ctx
807+ . catalog_names ( )
808+ . into_iter ( )
809+ . filter_map ( |name| self . ctx . catalog ( & name) )
810+ . flat_map ( move |catalog| {
811+ catalog
812+ . schema_names ( )
813+ . into_iter ( )
814+ . filter_map ( move |name| catalog. schema ( & name) )
815+ } )
816+ . flat_map ( |schema| schema. table_names ( ) )
817+ . collect ( )
818+ }
819+
820+ pub fn table ( & self , name : & str , py : Python ) -> PyResult < PyDataFrame > {
821+ let x = wait_for_future ( py, self . ctx . table ( name) )
822+ . map_err ( |e| PyKeyError :: new_err ( e. to_string ( ) ) ) ?;
823+ Ok ( PyDataFrame :: new ( x) )
824+ }
825+
826+ pub fn table_exist ( & self , name : & str ) -> PyDataFusionResult < bool > {
827+ Ok ( self . ctx . table_exist ( name) ?)
828+ }
829+
830+ pub fn empty_table ( & self ) -> PyDataFusionResult < PyDataFrame > {
831+ Ok ( PyDataFrame :: new ( self . ctx . read_empty ( ) ?) )
832+ }
833+
834+ pub fn session_id ( & self ) -> String {
835+ self . ctx . session_id ( )
836+ }
837+
838+ #[ allow( clippy:: too_many_arguments) ]
839+ #[ pyo3( signature = ( path, schema=None , schema_infer_max_records=1000 , file_extension=".json" , table_partition_cols=vec![ ] , file_compression_type=None ) ) ]
840+ pub fn read_json (
841+ & mut self ,
842+ path : PathBuf ,
843+ schema : Option < PyArrowType < Schema > > ,
844+ schema_infer_max_records : usize ,
845+ file_extension : & str ,
846+ table_partition_cols : Vec < ( String , String ) > ,
847+ file_compression_type : Option < String > ,
848+ py : Python ,
849+ ) -> PyDataFusionResult < PyDataFrame > {
850+ let path = path
851+ . to_str ( )
852+ . ok_or_else ( || PyValueError :: new_err ( "Unable to convert path to a string" ) ) ?;
853+ let mut options = NdJsonReadOptions :: default ( )
854+ . table_partition_cols ( convert_table_partition_cols ( table_partition_cols) ?)
855+ . file_compression_type ( parse_file_compression_type ( file_compression_type) ?) ;
856+ options. schema_infer_max_records = schema_infer_max_records;
857+ options. file_extension = file_extension;
858+ let df = if let Some ( schema) = schema {
859+ options. schema = Some ( & schema. 0 ) ;
860+ let result = self . ctx . read_json ( path, options) ;
861+ wait_for_future ( py, result) ?
862+ } else {
863+ let result = self . ctx . read_json ( path, options) ;
864+ wait_for_future ( py, result) ?
865+ } ;
866+ Ok ( PyDataFrame :: new ( df) )
867+ }
868+
869+ #[ allow( clippy:: too_many_arguments) ]
870+ #[ pyo3( signature = (
871+ path,
872+ schema=None ,
873+ has_header=true ,
874+ delimiter="," ,
875+ schema_infer_max_records=1000 ,
876+ file_extension=".csv" ,
877+ table_partition_cols=vec![ ] ,
878+ file_compression_type=None ) ) ]
879+ pub fn read_csv (
880+ & self ,
881+ path : & Bound < ' _ , PyAny > ,
882+ schema : Option < PyArrowType < Schema > > ,
883+ has_header : bool ,
884+ delimiter : & str ,
885+ schema_infer_max_records : usize ,
886+ file_extension : & str ,
887+ table_partition_cols : Vec < ( String , String ) > ,
888+ file_compression_type : Option < String > ,
889+ py : Python ,
890+ ) -> PyDataFusionResult < PyDataFrame > {
891+ let delimiter = delimiter. as_bytes ( ) ;
892+ if delimiter. len ( ) != 1 {
893+ return Err ( crate :: errors:: PyDataFusionError :: PythonError ( py_value_err (
894+ "Delimiter must be a single character" ,
895+ ) ) ) ;
896+ } ;
897+
729898 let mut options = CsvReadOptions :: new ( )
730899 . has_header ( has_header)
731900 . delimiter ( delimiter[ 0 ] )
0 commit comments