@@ -816,20 +816,11 @@ impl AnalyzerContext<'_> {
816816 & self ,
817817 scope : & mut DataScopeBuilder ,
818818 export_op : NamedSpec < ExportOpSpec > ,
819+ export_factory : Arc < dyn ExportTargetFactory > ,
819820 setup_state : Option < & mut FlowSetupState < DesiredMode > > ,
820821 existing_target_states : & HashMap < & ResourceIdentifier , Vec < & TargetSetupState > > ,
821822 ) -> Result < impl Future < Output = Result < AnalyzedExportOp > > + Send > {
822823 let export_target = export_op. spec . target ;
823- let export_factory = match self . registry . get ( & export_target. kind ) {
824- Some ( ExecutorFactory :: ExportTarget ( export_executor) ) => export_executor,
825- _ => {
826- return Err ( anyhow:: anyhow!(
827- "Export target kind not found: {}" ,
828- export_target. kind
829- ) )
830- }
831- } ;
832-
833824 let spec = serde_json:: Value :: Object ( export_target. spec . clone ( ) ) ;
834825 let ( local_collector_ref, collector_schema) =
835826 scope. consume_collector ( & export_op. spec . collector_name ) ?;
@@ -986,8 +977,8 @@ impl AnalyzerContext<'_> {
986977 . unwrap_or ( false ) ;
987978 Ok ( async move {
988979 trace ! ( "Start building executor for export op `{}`" , export_op. name) ;
989- let ( executor , query_target ) = setup_output
990- . executor
980+ let executors = setup_output
981+ . executors
991982 . await
992983 . with_context ( || format ! ( "Analyzing export op: {}" , export_op. name) ) ?;
993984 trace ! (
@@ -999,8 +990,8 @@ impl AnalyzerContext<'_> {
999990 name,
1000991 target_id : target_id. unwrap_or_default ( ) ,
1001992 input : local_collector_ref,
1002- executor ,
1003- query_target,
993+ export_context : executors . export_context ,
994+ query_target : executors . query_target ,
1004995 primary_key_def,
1005996 primary_key_type,
1006997 value_fields : value_fields_idx,
@@ -1127,18 +1118,36 @@ pub fn analyze_flow(
11271118 & flow_inst. reactive_ops ,
11281119 RefList :: Nil ,
11291120 ) ?;
1130- let export_ops_futs = flow_inst
1131- . export_ops
1132- . iter ( )
1133- . map ( |export_op| {
1134- analyzer_ctx. analyze_export_op (
1135- root_exec_scope. data ,
1136- export_op. clone ( ) ,
1137- Some ( & mut setup_state) ,
1138- & target_states_by_name_type,
1139- )
1140- } )
1141- . collect :: < Result < Vec < _ > > > ( ) ?;
1121+
1122+ let mut target_groups = IndexMap :: < String , AnalyzedExportTargetOpGroup > :: new ( ) ;
1123+ let mut export_ops_futs = vec ! [ ] ;
1124+ for ( idx, export_op) in flow_inst. export_ops . iter ( ) . enumerate ( ) {
1125+ let target_kind = export_op. spec . target . kind . clone ( ) ;
1126+ let export_factory = match registry. get ( & target_kind) {
1127+ Some ( ExecutorFactory :: ExportTarget ( export_executor) ) => export_executor,
1128+ _ => {
1129+ return Err ( anyhow:: anyhow!(
1130+ "Export target kind not found: {}" ,
1131+ export_op. spec. target. kind
1132+ ) )
1133+ }
1134+ } ;
1135+ export_ops_futs. push ( analyzer_ctx. analyze_export_op (
1136+ root_exec_scope. data ,
1137+ export_op. clone ( ) ,
1138+ export_factory. clone ( ) ,
1139+ Some ( & mut setup_state) ,
1140+ & target_states_by_name_type,
1141+ ) ?) ;
1142+ target_groups
1143+ . entry ( target_kind)
1144+ . or_insert_with ( || AnalyzedExportTargetOpGroup {
1145+ target_factory : export_factory. clone ( ) ,
1146+ op_idx : vec ! [ ] ,
1147+ } )
1148+ . op_idx
1149+ . push ( idx) ;
1150+ }
11421151
11431152 let tracking_table_setup = setup_state. tracking_table . clone ( ) ;
11441153 let data_schema = root_data_scope. into_data_schema ( ) ?;
@@ -1160,6 +1169,7 @@ pub fn analyze_flow(
11601169 import_ops,
11611170 op_scope,
11621171 export_ops,
1172+ export_op_groups : target_groups. into_values ( ) . collect ( ) ,
11631173 } )
11641174 } ;
11651175
0 commit comments