1- use crate :: builder:: plan:: AnalyzedValueMapping ;
1+ use crate :: builder:: plan:: {
2+ AnalyzedFieldReference , AnalyzedLocalFieldReference , AnalyzedValueMapping ,
3+ } ;
24use crate :: ops:: sdk:: {
35 AuthRegistry , BasicValueType , EnrichedValueType , FlowInstanceContext , OpArgSchema ,
46 OpArgsResolver , SimpleFunctionExecutor , SimpleFunctionFactoryBase , Value , make_output_type,
@@ -7,47 +9,52 @@ use anyhow::Result;
79use serde:: de:: DeserializeOwned ;
810use std:: sync:: Arc ;
911
10- fn new_literal_op_arg_schema (
11- name : Option < & str > ,
12- value : Value ,
13- value_type : EnrichedValueType ,
14- ) -> OpArgSchema {
15- OpArgSchema {
16- name : name. map_or ( crate :: base:: spec:: OpArgName ( None ) , |n| {
17- crate :: base:: spec:: OpArgName ( Some ( n. to_string ( ) ) )
18- } ) ,
19- value_type,
20- analyzed_value : AnalyzedValueMapping :: Constant { value } ,
21- }
22- }
23-
24- // This function provides a helper to create OpArgSchema for literal values.
25- pub fn build_arg_schema ( name : & str , value : Value , value_type : BasicValueType ) -> OpArgSchema {
26- new_literal_op_arg_schema ( Some ( name) , value, make_output_type ( value_type) )
12+ // This function builds an argument schema for a flow function.
13+ pub fn build_arg_schema (
14+ name : & str ,
15+ value_type : BasicValueType ,
16+ ) -> ( Option < & str > , EnrichedValueType ) {
17+ ( Some ( name) , make_output_type ( value_type) )
2718}
2819
2920// This function tests a flow function by providing a spec, input argument schemas, and values.
3021pub async fn test_flow_function < S , R , F > (
3122 factory : Arc < F > ,
3223 spec : S ,
33- input_arg_schemas : Vec < OpArgSchema > ,
24+ input_arg_schemas : Vec < ( Option < & str > , EnrichedValueType ) > ,
3425 input_arg_values : Vec < Value > ,
3526) -> Result < Value >
3627where
3728 S : DeserializeOwned + Send + Sync + ' static ,
3829 R : Send + Sync + ' static ,
3930 F : SimpleFunctionFactoryBase < Spec = S , ResolvedArgs = R > + ?Sized ,
4031{
32+ // 1. Construct OpArgSchema
33+ let op_arg_schemas: Vec < OpArgSchema > = input_arg_schemas
34+ . into_iter ( )
35+ . enumerate ( )
36+ . map ( |( idx, ( name, value_type) ) | OpArgSchema {
37+ name : name. map_or ( crate :: base:: spec:: OpArgName ( None ) , |n| {
38+ crate :: base:: spec:: OpArgName ( Some ( n. to_string ( ) ) )
39+ } ) ,
40+ value_type,
41+ analyzed_value : AnalyzedValueMapping :: Field ( AnalyzedFieldReference {
42+ local : AnalyzedLocalFieldReference {
43+ fields_idx : vec ! [ idx as u32 ] ,
44+ } ,
45+ scope_up_level : 0 ,
46+ } ) ,
47+ } )
48+ . collect ( ) ;
49+
50+ // 2. Resolve Schema & Args
51+ let mut args_resolver = OpArgsResolver :: new ( & op_arg_schemas) ?;
4152 let context = Arc :: new ( FlowInstanceContext {
4253 flow_instance_name : "test_flow_function" . to_string ( ) ,
4354 auth_registry : Arc :: new ( AuthRegistry :: default ( ) ) ,
4455 py_exec_ctx : None ,
4556 } ) ;
4657
47- // 2. Resolve Schema & Args
48- // The caller of test_flow_function will be responsible for creating these schemas.
49- let mut args_resolver = OpArgsResolver :: new ( & input_arg_schemas) ?;
50-
5158 let ( resolved_args_from_schema, _output_schema) : ( R , EnrichedValueType ) = factory
5259 . resolve_schema ( & spec, & mut args_resolver, & context)
5360 . await ?;
0 commit comments