@@ -36,9 +36,13 @@ use databend_common_expression::FunctionKind;
36
36
use databend_common_expression:: Scalar ;
37
37
use databend_common_functions:: BUILTIN_FUNCTIONS ;
38
38
use databend_common_meta_app:: principal:: UDFDefinition ;
39
+ use databend_common_meta_app:: principal:: UDTF ;
39
40
use databend_common_storages_result_cache:: ResultCacheMetaManager ;
40
41
use databend_common_storages_result_cache:: ResultScan ;
41
42
use databend_common_users:: UserApiProvider ;
43
+ use derive_visitor:: DriveMut ;
44
+ use derive_visitor:: VisitorMut ;
45
+ use itertools:: Itertools ;
42
46
43
47
use crate :: binder:: scalar:: ScalarBinder ;
44
48
use crate :: binder:: table_args:: bind_table_args;
@@ -70,6 +74,37 @@ impl Binder {
70
74
alias : & Option < TableAlias > ,
71
75
sample : & Option < SampleConfig > ,
72
76
) -> Result < ( SExpr , BindContext ) > {
77
+ #[ derive( VisitorMut ) ]
78
+ #[ visitor( Expr ( enter) ) ]
79
+ struct UDTFArgVisitor < ' a > {
80
+ udtf : & ' a UDTF ,
81
+ table_args : & ' a TableArgs ,
82
+ }
83
+
84
+ impl UDTFArgVisitor < ' _ > {
85
+ fn enter_expr ( & mut self , expr : & mut Expr ) {
86
+ if let Expr :: ColumnRef { span, column } = expr {
87
+ if column. database . is_some ( ) || column. table . is_some ( ) {
88
+ return ;
89
+ }
90
+ assert_eq ! ( self . udtf. arg_types. len( ) , self . table_args. positioned. len( ) ) ;
91
+ let Some ( ( pos, ( _, _ty) ) ) = self
92
+ . udtf
93
+ . arg_types
94
+ . iter ( )
95
+ . find_position ( |( name, _) | name == column. column . name ( ) )
96
+ else {
97
+ return ;
98
+ } ;
99
+
100
+ * expr = Expr :: Literal {
101
+ span : * span,
102
+ value : Literal :: String ( self . table_args . positioned [ pos] . to_string ( ) ) ,
103
+ }
104
+ }
105
+ }
106
+ }
107
+
73
108
let func_name = normalize_identifier ( name, & self . name_resolution_ctx ) ;
74
109
75
110
if BUILTIN_FUNCTIONS
@@ -142,30 +177,32 @@ impl Binder {
142
177
. await ?
143
178
. map ( |udf| udf. definition )
144
179
{
145
- let mut sql = udtf. sql ;
146
-
147
- for ( name, arg) in table_args. named . iter ( ) {
148
- // FIXME: Parameter substitution
149
- let Some ( ( _, _ty) ) =
150
- udtf. arg_types . iter ( ) . find ( |( arg_name, _) | arg_name == name)
151
- else {
152
- return Err ( ErrorCode :: InvalidArgument ( format ! (
153
- "Function '{func_name}' does not have a parameter named '{name}'"
154
- ) ) ) ;
155
- } ;
180
+ let mut stmt = Planner :: new ( self . ctx . clone ( ) )
181
+ . parse_sql ( & udtf. sql ) ?
182
+ . statement ;
156
183
157
- sql = sql. replace ( name, & arg. to_string ( ) ) ;
184
+ if udtf. arg_types . len ( ) != table_args. positioned . len ( ) {
185
+ return Err ( ErrorCode :: UDFSchemaMismatch ( format ! (
186
+ "UDTF '{}' argument types length {} does not match input arguments length {}" ,
187
+ func_name,
188
+ udtf. arg_types. len( ) ,
189
+ table_args. positioned. len( )
190
+ ) ) ) ;
158
191
}
159
- let mut planner = Planner :: new ( self . ctx . clone ( ) ) ;
160
- let ( _, extras) = planner. plan_sql ( & sql) . await ?;
192
+ let mut visitor = UDTFArgVisitor {
193
+ udtf : & udtf,
194
+ table_args : & table_args,
195
+ } ;
196
+ stmt. drive_mut ( & mut visitor) ;
197
+
161
198
let binder = Binder :: new (
162
199
self . ctx . clone ( ) ,
163
200
CatalogManager :: instance ( ) ,
164
201
self . name_resolution_ctx . clone ( ) ,
165
202
self . metadata . clone ( ) ,
166
203
)
167
204
. with_subquery_executor ( self . subquery_executor . clone ( ) ) ;
168
- let plan = binder. bind ( & extras . statement ) . await ?;
205
+ let plan = binder. bind ( & stmt ) . await ?;
169
206
170
207
let Plan :: Query {
171
208
s_expr,
@@ -182,7 +219,8 @@ impl Binder {
182
219
183
220
if udtf. return_types . len ( ) != bind_context. columns . len ( ) {
184
221
return Err ( ErrorCode :: UDFSchemaMismatch ( format ! (
185
- "return types length {} does not match output columns length {}" ,
222
+ "UDTF '{}' return types length {} does not match output columns length {}" ,
223
+ func_name,
186
224
udtf. return_types. len( ) ,
187
225
bind_context. columns. len( )
188
226
) ) ) ;
0 commit comments