@@ -930,6 +930,7 @@ impl AnalyzerContext<'_> {
930930 scope : & mut DataScopeBuilder ,
931931 flow_inst : & FlowInstanceSpec ,
932932 export_op_group : & AnalyzedExportTargetOpGroup ,
933+ declarations : Vec < serde_json:: Value > ,
933934 flow_setup_state : & mut FlowSetupState < DesiredMode > ,
934935 existing_target_states : & HashMap < & ResourceIdentifier , Vec < & TargetSetupState > > ,
935936 ) -> Result < Vec < impl Future < Output = Result < AnalyzedExportOp > > + Send > > {
@@ -1007,7 +1008,7 @@ impl AnalyzerContext<'_> {
10071008 }
10081009 let ( data_collections_output, _) = export_op_group. target_factory . clone ( ) . build (
10091010 collection_specs,
1010- vec ! [ ] ,
1011+ declarations ,
10111012 self . flow_ctx . clone ( ) ,
10121013 ) ?;
10131014 if data_collections_output. len ( ) != data_fields_infos. len ( ) {
@@ -1162,37 +1163,49 @@ pub fn analyze_flow(
11621163 RefList :: Nil ,
11631164 ) ?;
11641165
1165- let mut target_groups = IndexMap :: < String , AnalyzedExportTargetOpGroup > :: new ( ) ;
1166+ #[ derive( Default ) ]
1167+ struct TargetOpGroup {
1168+ export_op_ids : Vec < usize > ,
1169+ declarations : Vec < serde_json:: Value > ,
1170+ }
1171+ let mut target_op_group = IndexMap :: < String , TargetOpGroup > :: new ( ) ;
11661172 for ( idx, export_op) in flow_inst. export_ops . iter ( ) . enumerate ( ) {
1167- let target_kind = export_op. spec . target . kind . clone ( ) ;
1173+ target_op_group
1174+ . entry ( export_op. spec . target . kind . clone ( ) )
1175+ . or_default ( )
1176+ . export_op_ids
1177+ . push ( idx) ;
1178+ }
1179+ for declaration in flow_inst. declarations . iter ( ) {
1180+ target_op_group
1181+ . entry ( declaration. kind . clone ( ) )
1182+ . or_default ( )
1183+ . declarations
1184+ . push ( serde_json:: Value :: Object ( declaration. spec . clone ( ) ) ) ;
1185+ }
1186+
1187+ let mut export_ops_futs = vec ! [ ] ;
1188+ let mut analyzed_target_op_groups = vec ! [ ] ;
1189+ for ( target_kind, op_ids) in target_op_group. into_iter ( ) {
11681190 let export_factory = match registry. get ( & target_kind) {
11691191 Some ( ExecutorFactory :: ExportTarget ( export_executor) ) => export_executor,
11701192 _ => {
1171- return Err ( anyhow:: anyhow!(
1172- "Export target kind not found: {}" ,
1173- export_op. spec. target. kind
1174- ) )
1193+ bail ! ( "Export target kind not found: {target_kind}" ) ;
11751194 }
11761195 } ;
1177- target_groups
1178- . entry ( target_kind)
1179- . or_insert_with ( || AnalyzedExportTargetOpGroup {
1180- target_factory : export_factory. clone ( ) ,
1181- op_idx : vec ! [ ] ,
1182- } )
1183- . op_idx
1184- . push ( idx) ;
1185- }
1186-
1187- let mut export_ops_futs = vec ! [ ] ;
1188- for group in target_groups. values ( ) {
1196+ let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
1197+ target_factory : export_factory. clone ( ) ,
1198+ op_idx : op_ids. export_op_ids ,
1199+ } ;
11891200 export_ops_futs. extend ( analyzer_ctx. analyze_export_op_group (
11901201 root_exec_scope. data ,
11911202 flow_inst,
1192- group,
1203+ & analyzed_target_op_group,
1204+ op_ids. declarations ,
11931205 & mut setup_state,
11941206 & target_states_by_name_type,
11951207 ) ?) ;
1208+ analyzed_target_op_groups. push ( analyzed_target_op_group) ;
11961209 }
11971210
11981211 let tracking_table_setup = setup_state. tracking_table . clone ( ) ;
@@ -1215,7 +1228,7 @@ pub fn analyze_flow(
12151228 import_ops,
12161229 op_scope,
12171230 export_ops,
1218- export_op_groups : target_groups . into_values ( ) . collect ( ) ,
1231+ export_op_groups : analyzed_target_op_groups ,
12191232 } )
12201233 } ;
12211234
0 commit comments