@@ -122,30 +122,31 @@ pub async fn evaluate_data(
122122 let fl = & lib_context. with_flow_context ( & flow_name, |ctx| ctx. flow . clone ( ) ) ?;
123123 let schema = & fl. data_schema ;
124124
125- let field_idx = schema
126- . fields
125+ let source_op_idx = fl
126+ . flow_instance
127+ . source_ops
127128 . iter ( )
128- . position ( |f| f . name == query. field )
129+ . position ( |source_op| source_op . name == query. field )
129130 . ok_or_else ( || {
130131 ApiError :: new (
131- & format ! ( "field not found: {}" , query. field) ,
132+ & format ! ( "source field not found: {}" , query. field) ,
132133 StatusCode :: BAD_REQUEST ,
133134 )
134135 } ) ?;
135-
136- let field_schema = & schema. fields [ field_idx] ;
136+ let execution_plan = fl. get_execution_plan ( ) . await ?;
137+ let field_schema =
138+ & schema. fields [ execution_plan. source_ops [ source_op_idx] . output . field_idx as usize ] ;
137139 let collection_schema = match & field_schema. value_type . typ {
138140 schema:: ValueType :: Collection ( collection) => collection,
139141 _ => api_bail ! ( "field is not a table: {}" , query. field) ,
140142 } ;
141- let execution_plan = fl. get_execution_plan ( ) . await ?;
142143 let key_field = collection_schema
143144 . key_field ( )
144145 . ok_or_else ( || api_error ! ( "field {} does not have a key" , query. field) ) ?;
145146 let key = value:: KeyValue :: from_strs ( query. key , & key_field. value_type . typ ) ?;
146147
147148 let data_builder =
148- evaluator:: evaluate_source_entry ( & execution_plan, field_idx as u32 , & schema, & key, None )
149+ evaluator:: evaluate_source_entry ( & execution_plan, source_op_idx , & schema, & key, None )
149150 . await ?
150151 . ok_or_else ( || {
151152 api_error ! ( "value not found for source at the specified key: {key:?}" )
0 commit comments