@@ -89,10 +89,11 @@ pub struct OpArgsResolver<'a> {
8989 num_positional_args : usize ,
9090 next_positional_idx : usize ,
9191 remaining_kwargs : HashMap < & ' a str , usize > ,
92+ required_args_idx : & ' a mut Vec < usize > ,
9293}
9394
9495impl < ' a > OpArgsResolver < ' a > {
95- pub fn new ( args : & ' a [ OpArgSchema ] ) -> Result < Self > {
96+ pub fn new ( args : & ' a [ OpArgSchema ] , required_args_idx : & ' a mut Vec < usize > ) -> Result < Self > {
9697 let mut num_positional_args = 0 ;
9798 let mut kwargs = HashMap :: new ( ) ;
9899 for ( idx, arg) in args. iter ( ) . enumerate ( ) {
@@ -110,6 +111,7 @@ impl<'a> OpArgsResolver<'a> {
110111 num_positional_args,
111112 next_positional_idx : 0 ,
112113 remaining_kwargs : kwargs,
114+ required_args_idx,
113115 } )
114116 }
115117
@@ -135,9 +137,11 @@ impl<'a> OpArgsResolver<'a> {
135137 }
136138
137139 pub fn next_arg ( & mut self , name : & str ) -> Result < ResolvedOpArg > {
138- Ok ( self
140+ let arg = self
139141 . next_optional_arg ( name) ?
140- . ok_or_else ( || api_error ! ( "Required argument `{name}` is missing" , ) ) ?)
142+ . ok_or_else ( || api_error ! ( "Required argument `{name}` is missing" , ) ) ?;
143+ self . required_args_idx . push ( arg. idx ) ;
144+ Ok ( arg)
141145 }
142146
143147 pub fn done ( self ) -> Result < ( ) > {
@@ -233,7 +237,7 @@ pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'stat
233237 spec : Self :: Spec ,
234238 resolved_input_schema : Self :: ResolvedArgs ,
235239 context : Arc < FlowInstanceContext > ,
236- ) -> Result < Box < dyn SimpleFunctionExecutor > > ;
240+ ) -> Result < impl SimpleFunctionExecutor > ;
237241
238242 fn register ( self , registry : & mut ExecutorFactoryRegistry ) -> Result < ( ) >
239243 where
@@ -246,6 +250,31 @@ pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'stat
246250 }
247251}
248252
253+ struct FunctionExecutorWrapper < E : SimpleFunctionExecutor > {
254+ executor : E ,
255+ required_args_idx : Vec < usize > ,
256+ }
257+
258+ #[ async_trait]
259+ impl < E : SimpleFunctionExecutor > SimpleFunctionExecutor for FunctionExecutorWrapper < E > {
260+ async fn evaluate ( & self , args : Vec < value:: Value > ) -> Result < value:: Value > {
261+ for idx in & self . required_args_idx {
262+ if args[ * idx] . is_null ( ) {
263+ return Ok ( value:: Value :: Null ) ;
264+ }
265+ }
266+ self . executor . evaluate ( args) . await
267+ }
268+
269+ fn enable_cache ( & self ) -> bool {
270+ self . executor . enable_cache ( )
271+ }
272+
273+ fn behavior_version ( & self ) -> Option < u32 > {
274+ self . executor . behavior_version ( )
275+ }
276+ }
277+
249278#[ async_trait]
250279impl < T : SimpleFunctionFactoryBase > SimpleFunctionFactory for T {
251280 async fn build (
@@ -258,13 +287,31 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
258287 BoxFuture < ' static , Result < Box < dyn SimpleFunctionExecutor > > > ,
259288 ) > {
260289 let spec: T :: Spec = serde_json:: from_value ( spec) ?;
261- let mut args_resolver = OpArgsResolver :: new ( & input_schema) ?;
262- let ( resolved_input_schema, output_schema) = self
290+ let mut required_args_idx = vec ! [ ] ;
291+ let mut args_resolver = OpArgsResolver :: new ( & input_schema, & mut required_args_idx) ?;
292+ let ( resolved_input_schema, mut output_schema) = self
263293 . resolve_schema ( & spec, & mut args_resolver, & context)
264294 . await ?;
295+
296+ // If any required argument is nullable, the output schema should be nullable.
297+ if args_resolver
298+ . required_args_idx
299+ . iter ( )
300+ . any ( |idx| input_schema[ * idx] . value_type . nullable )
301+ {
302+ output_schema. nullable = true ;
303+ }
304+
265305 args_resolver. done ( ) ?;
266- let executor = self . build_executor ( spec, resolved_input_schema, context) ;
267- Ok ( ( output_schema, executor) )
306+ let executor = async move {
307+ Ok ( Box :: new ( FunctionExecutorWrapper {
308+ executor : self
309+ . build_executor ( spec, resolved_input_schema, context)
310+ . await ?,
311+ required_args_idx,
312+ } ) as Box < dyn SimpleFunctionExecutor > )
313+ } ;
314+ Ok ( ( output_schema, Box :: pin ( executor) ) )
268315 }
269316}
270317
0 commit comments