@@ -2,16 +2,20 @@ use std::sync::Arc;
22
33use async_trait:: async_trait;
44use datafusion:: arrow:: array:: {
5- ArrayRef , BooleanArray , Float64Array , Int16Array , Int32Array , RecordBatch , StringArray ,
5+ as_boolean_array, ArrayRef , BooleanArray , Float64Array , Int16Array , Int32Array , RecordBatch ,
6+ StringArray , StringBuilder ,
67} ;
78use datafusion:: arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
89use datafusion:: catalog:: streaming:: StreamingTable ;
910use datafusion:: catalog:: { CatalogProviderList , MemTable , SchemaProvider } ;
11+ use datafusion:: common:: utils:: SingleRowListArrayBuilder ;
1012use datafusion:: datasource:: TableProvider ;
11- use datafusion:: error:: Result ;
13+ use datafusion:: error:: { DataFusionError , Result } ;
1214use datafusion:: execution:: { SendableRecordBatchStream , TaskContext } ;
15+ use datafusion:: logical_expr:: { ColumnarValue , ScalarUDF , Volatility } ;
1316use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
1417use datafusion:: physical_plan:: streaming:: PartitionStream ;
18+ use datafusion:: prelude:: { create_udf, SessionContext } ;
1519
1620const PG_CATALOG_TABLE_PG_TYPE : & str = "pg_type" ;
1721const PG_CATALOG_TABLE_PG_CLASS : & str = "pg_class" ;
@@ -595,3 +599,77 @@ impl PartitionStream for PgDatabaseTable {
595599 ) )
596600 }
597601}
602+
603+ pub fn create_current_schemas_udf ( ) -> ScalarUDF {
604+ // Define the function implementation
605+ let func = move |args : & [ ColumnarValue ] | {
606+ let args = ColumnarValue :: values_to_arrays ( args) ?;
607+ let input = as_boolean_array ( & args[ 0 ] ) ;
608+
609+ // Create a UTF8 array with a single value
610+ let mut values = vec ! [ "public" ] ;
611+ // include implicit schemas
612+ if input. value ( 0 ) {
613+ values. push ( "information_schema" ) ;
614+ values. push ( "pg_catalog" ) ;
615+ }
616+
617+ let list_array = SingleRowListArrayBuilder :: new ( Arc :: new ( StringArray :: from ( values) ) ) ;
618+
619+ let array: ArrayRef = Arc :: new ( list_array. build_list_array ( ) ) ;
620+
621+ Ok ( ColumnarValue :: Array ( array) )
622+ } ;
623+
624+ // Wrap the implementation in a scalar function
625+ create_udf (
626+ "current_schemas" ,
627+ vec ! [ DataType :: Boolean ] ,
628+ DataType :: List ( Arc :: new ( Field :: new ( "schema" , DataType :: Utf8 , false ) ) ) ,
629+ Volatility :: Immutable ,
630+ Arc :: new ( func) ,
631+ )
632+ }
633+
634+ pub fn create_current_schema_udf ( ) -> ScalarUDF {
635+ // Define the function implementation
636+ let func = move |_args : & [ ColumnarValue ] | {
637+ // Create a UTF8 array with a single value
638+ let mut builder = StringBuilder :: new ( ) ;
639+ builder. append_value ( "public" ) ;
640+ let array: ArrayRef = Arc :: new ( builder. finish ( ) ) ;
641+
642+ Ok ( ColumnarValue :: Array ( array) )
643+ } ;
644+
645+ // Wrap the implementation in a scalar function
646+ create_udf (
647+ "current_schema" ,
648+ vec ! [ ] ,
649+ DataType :: Utf8 ,
650+ Volatility :: Immutable ,
651+ Arc :: new ( func) ,
652+ )
653+ }
654+
655+ /// Install pg_catalog and postgres UDFs to current `SessionContext`
656+ pub fn setup_pg_catalog (
657+ session_context : & SessionContext ,
658+ catalog_name : & str ,
659+ ) -> Result < ( ) , Box < DataFusionError > > {
660+ let pg_catalog = PgCatalogSchemaProvider :: new ( session_context. state ( ) . catalog_list ( ) . clone ( ) ) ;
661+ session_context
662+ . catalog ( catalog_name)
663+ . ok_or_else ( || {
664+ DataFusionError :: Configuration ( format ! (
665+ "Catalog not found when registering pg_catalog: {}" ,
666+ catalog_name
667+ ) )
668+ } ) ?
669+ . register_schema ( "pg_catalog" , Arc :: new ( pg_catalog) ) ?;
670+
671+ session_context. register_udf ( create_current_schema_udf ( ) ) ;
672+ session_context. register_udf ( create_current_schemas_udf ( ) ) ;
673+
674+ Ok ( ( ) )
675+ }
0 commit comments