@@ -38,7 +38,7 @@ use arrow_array::builder::BooleanBuilder;
3838use async_trait:: async_trait;
3939use datafusion_common:: error:: Result ;
4040use datafusion_common:: DataFusionError ;
41- use datafusion_expr:: { AggregateUDF , ScalarUDF , Signature , WindowUDF } ;
41+ use datafusion_expr:: { AggregateUDF , ScalarUDF , Signature , TypeSignature , WindowUDF } ;
4242use std:: collections:: { HashMap , HashSet } ;
4343use std:: fmt:: Debug ;
4444use std:: { any:: Any , sync:: Arc } ;
@@ -50,10 +50,18 @@ pub(crate) const COLUMNS: &str = "columns";
5050pub ( crate ) const DF_SETTINGS : & str = "df_settings" ;
5151pub ( crate ) const SCHEMATA : & str = "schemata" ;
5252pub ( crate ) const ROUTINES : & str = "routines" ;
53+ pub ( crate ) const PARAMETERS : & str = "parameters" ;
5354
5455/// All information schema tables
55- pub const INFORMATION_SCHEMA_TABLES : & [ & str ] =
56- & [ TABLES , VIEWS , COLUMNS , DF_SETTINGS , SCHEMATA , ROUTINES ] ;
56+ pub const INFORMATION_SCHEMA_TABLES : & [ & str ] = & [
57+ TABLES ,
58+ VIEWS ,
59+ COLUMNS ,
60+ DF_SETTINGS ,
61+ SCHEMATA ,
62+ ROUTINES ,
63+ PARAMETERS ,
64+ ] ;
5765
5866/// Implements the `information_schema` virtual schema and tables
5967///
@@ -286,6 +294,102 @@ impl InformationSchemaConfig {
286294 fn is_deterministic ( signature : & Signature ) -> bool {
287295 signature. volatility == Volatility :: Immutable
288296 }
297+ fn make_parameters (
298+ & self ,
299+ udfs : & HashMap < String , Arc < ScalarUDF > > ,
300+ udafs : & HashMap < String , Arc < AggregateUDF > > ,
301+ udwfs : & HashMap < String , Arc < WindowUDF > > ,
302+ config_options : & ConfigOptions ,
303+ builder : & mut InformationSchemaParametersBuilder ,
304+ ) -> Result < ( ) > {
305+ let catalog_name = & config_options. catalog . default_catalog ;
306+ let schema_name = & config_options. catalog . default_schema ;
307+ let mut add_parameters = |func_name : & str ,
308+ args : Option < & Vec < ( String , String ) > > ,
309+ arg_types : Vec < String > ,
310+ return_type : Option < String > ,
311+ is_variadic : bool | {
312+ for ( position, type_name) in arg_types. iter ( ) . enumerate ( ) {
313+ let param_name =
314+ args. and_then ( |a| a. get ( position) . map ( |arg| arg. 0 . as_str ( ) ) ) ;
315+ builder. add_parameter (
316+ catalog_name,
317+ schema_name,
318+ func_name,
319+ position as u64 + 1 ,
320+ "IN" ,
321+ param_name,
322+ type_name,
323+ None :: < & str > ,
324+ is_variadic,
325+ ) ;
326+ }
327+ if let Some ( return_type) = return_type {
328+ builder. add_parameter (
329+ catalog_name,
330+ schema_name,
331+ func_name,
332+ 1 ,
333+ "OUT" ,
334+ None :: < & str > ,
335+ return_type. as_str ( ) ,
336+ None :: < & str > ,
337+ false ,
338+ ) ;
339+ }
340+ } ;
341+
342+ for ( func_name, udf) in udfs {
343+ let args = udf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
344+ let combinations = get_udf_args_and_return_types ( udf) ?;
345+ for ( arg_types, return_type) in combinations {
346+ add_parameters (
347+ func_name,
348+ args. as_ref ( ) ,
349+ arg_types,
350+ return_type,
351+ Self :: is_variadic ( udf. signature ( ) ) ,
352+ ) ;
353+ }
354+ }
355+
356+ for ( func_name, udaf) in udafs {
357+ let args = udaf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
358+ let combinations = get_udaf_args_and_return_types ( udaf) ?;
359+ for ( arg_types, return_type) in combinations {
360+ add_parameters (
361+ func_name,
362+ args. as_ref ( ) ,
363+ arg_types,
364+ return_type,
365+ Self :: is_variadic ( udaf. signature ( ) ) ,
366+ ) ;
367+ }
368+ }
369+
370+ for ( func_name, udwf) in udwfs {
371+ let args = udwf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
372+ let combinations = get_udwf_args_and_return_types ( udwf) ?;
373+ for ( arg_types, return_type) in combinations {
374+ add_parameters (
375+ func_name,
376+ args. as_ref ( ) ,
377+ arg_types,
378+ return_type,
379+ Self :: is_variadic ( udwf. signature ( ) ) ,
380+ ) ;
381+ }
382+ }
383+
384+ Ok ( ( ) )
385+ }
386+
387+ fn is_variadic ( signature : & Signature ) -> bool {
388+ matches ! (
389+ signature. type_signature,
390+ TypeSignature :: Variadic ( _) | TypeSignature :: VariadicAny
391+ )
392+ }
289393}
290394
291395/// get the arguments and return types of a UDF
@@ -384,6 +488,7 @@ impl SchemaProvider for InformationSchemaProvider {
384488 DF_SETTINGS => Arc :: new ( InformationSchemaDfSettings :: new ( config) ) ,
385489 SCHEMATA => Arc :: new ( InformationSchemata :: new ( config) ) ,
386490 ROUTINES => Arc :: new ( InformationSchemaRoutines :: new ( config) ) ,
491+ PARAMETERS => Arc :: new ( InformationSchemaParameters :: new ( config) ) ,
387492 _ => return Ok ( None ) ,
388493 } ;
389494
@@ -1098,3 +1203,135 @@ impl PartitionStream for InformationSchemaRoutines {
10981203 ) )
10991204 }
11001205}
1206+
1207+ #[ derive( Debug ) ]
1208+ struct InformationSchemaParameters {
1209+ schema : SchemaRef ,
1210+ config : InformationSchemaConfig ,
1211+ }
1212+
1213+ impl InformationSchemaParameters {
1214+ fn new ( config : InformationSchemaConfig ) -> Self {
1215+ let schema = Arc :: new ( Schema :: new ( vec ! [
1216+ Field :: new( "specific_catalog" , DataType :: Utf8 , false ) ,
1217+ Field :: new( "specific_schema" , DataType :: Utf8 , false ) ,
1218+ Field :: new( "specific_name" , DataType :: Utf8 , false ) ,
1219+ Field :: new( "ordinal_position" , DataType :: UInt64 , false ) ,
1220+ Field :: new( "parameter_mode" , DataType :: Utf8 , false ) ,
1221+ Field :: new( "parameter_name" , DataType :: Utf8 , true ) ,
1222+ Field :: new( "data_type" , DataType :: Utf8 , false ) ,
1223+ Field :: new( "parameter_default" , DataType :: Utf8 , true ) ,
1224+ Field :: new( "is_variadic" , DataType :: Boolean , false ) ,
1225+ ] ) ) ;
1226+
1227+ Self { schema, config }
1228+ }
1229+
1230+ fn builder ( & self ) -> InformationSchemaParametersBuilder {
1231+ InformationSchemaParametersBuilder {
1232+ schema : self . schema . clone ( ) ,
1233+ specific_catalog : StringBuilder :: new ( ) ,
1234+ specific_schema : StringBuilder :: new ( ) ,
1235+ specific_name : StringBuilder :: new ( ) ,
1236+ ordinal_position : UInt64Builder :: new ( ) ,
1237+ parameter_mode : StringBuilder :: new ( ) ,
1238+ parameter_name : StringBuilder :: new ( ) ,
1239+ data_type : StringBuilder :: new ( ) ,
1240+ parameter_default : StringBuilder :: new ( ) ,
1241+ is_variadic : BooleanBuilder :: new ( ) ,
1242+ inserted : HashSet :: new ( ) ,
1243+ }
1244+ }
1245+ }
1246+
1247+ struct InformationSchemaParametersBuilder {
1248+ schema : SchemaRef ,
1249+ specific_catalog : StringBuilder ,
1250+ specific_schema : StringBuilder ,
1251+ specific_name : StringBuilder ,
1252+ ordinal_position : UInt64Builder ,
1253+ parameter_mode : StringBuilder ,
1254+ parameter_name : StringBuilder ,
1255+ data_type : StringBuilder ,
1256+ parameter_default : StringBuilder ,
1257+ is_variadic : BooleanBuilder ,
1258+ // use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
1259+ inserted : HashSet < ( String , u64 , String , String ) > ,
1260+ }
1261+
1262+ impl InformationSchemaParametersBuilder {
1263+ #[ allow( clippy:: too_many_arguments) ]
1264+ fn add_parameter (
1265+ & mut self ,
1266+ specific_catalog : impl AsRef < str > ,
1267+ specific_schema : impl AsRef < str > ,
1268+ specific_name : impl AsRef < str > ,
1269+ ordinal_position : u64 ,
1270+ parameter_mode : impl AsRef < str > ,
1271+ parameter_name : Option < impl AsRef < str > > ,
1272+ data_type : impl AsRef < str > ,
1273+ parameter_default : Option < impl AsRef < str > > ,
1274+ is_variadic : bool ,
1275+ ) {
1276+ let key = (
1277+ specific_name. as_ref ( ) . to_string ( ) ,
1278+ ordinal_position,
1279+ parameter_mode. as_ref ( ) . to_string ( ) ,
1280+ data_type. as_ref ( ) . to_string ( ) ,
1281+ ) ;
1282+ if self . inserted . insert ( key) {
1283+ self . specific_catalog
1284+ . append_value ( specific_catalog. as_ref ( ) ) ;
1285+ self . specific_schema . append_value ( specific_schema. as_ref ( ) ) ;
1286+ self . specific_name . append_value ( specific_name. as_ref ( ) ) ;
1287+ self . ordinal_position . append_value ( ordinal_position) ;
1288+ self . parameter_mode . append_value ( parameter_mode. as_ref ( ) ) ;
1289+ self . parameter_name . append_option ( parameter_name. as_ref ( ) ) ;
1290+ self . data_type . append_value ( data_type. as_ref ( ) ) ;
1291+ self . parameter_default . append_option ( parameter_default) ;
1292+ self . is_variadic . append_value ( is_variadic) ;
1293+ }
1294+ }
1295+
1296+ fn finish ( & mut self ) -> RecordBatch {
1297+ RecordBatch :: try_new (
1298+ self . schema . clone ( ) ,
1299+ vec ! [
1300+ Arc :: new( self . specific_catalog. finish( ) ) ,
1301+ Arc :: new( self . specific_schema. finish( ) ) ,
1302+ Arc :: new( self . specific_name. finish( ) ) ,
1303+ Arc :: new( self . ordinal_position. finish( ) ) ,
1304+ Arc :: new( self . parameter_mode. finish( ) ) ,
1305+ Arc :: new( self . parameter_name. finish( ) ) ,
1306+ Arc :: new( self . data_type. finish( ) ) ,
1307+ Arc :: new( self . parameter_default. finish( ) ) ,
1308+ Arc :: new( self . is_variadic. finish( ) ) ,
1309+ ] ,
1310+ )
1311+ . unwrap ( )
1312+ }
1313+ }
1314+
1315+ impl PartitionStream for InformationSchemaParameters {
1316+ fn schema ( & self ) -> & SchemaRef {
1317+ & self . schema
1318+ }
1319+
1320+ fn execute ( & self , ctx : Arc < TaskContext > ) -> SendableRecordBatchStream {
1321+ let config = self . config . clone ( ) ;
1322+ let mut builder = self . builder ( ) ;
1323+ Box :: pin ( RecordBatchStreamAdapter :: new (
1324+ self . schema . clone ( ) ,
1325+ futures:: stream:: once ( async move {
1326+ config. make_parameters (
1327+ ctx. scalar_functions ( ) ,
1328+ ctx. aggregate_functions ( ) ,
1329+ ctx. window_functions ( ) ,
1330+ ctx. session_config ( ) . options ( ) ,
1331+ & mut builder,
1332+ ) ?;
1333+ Ok ( builder. finish ( ) )
1334+ } ) ,
1335+ ) )
1336+ }
1337+ }
0 commit comments