@@ -60,16 +60,16 @@ impl SharedAckFn {
6060}
6161
6262async fn update_source (
63- flow_ctx : Arc < FlowContext > ,
63+ flow : Arc < builder :: AnalyzedFlow > ,
6464 plan : Arc < plan:: ExecutionPlan > ,
65+ execution_ctx : Arc < tokio:: sync:: OwnedRwLockReadGuard < crate :: lib_context:: ExecutionContext > > ,
6566 source_update_stats : Arc < stats:: UpdateStats > ,
6667 source_idx : usize ,
6768 pool : PgPool ,
6869 options : FlowLiveUpdaterOptions ,
6970) -> Result < ( ) > {
70- let execution_ctx = flow_ctx. execution_ctx . read ( ) . await ;
7171 let source_context = execution_ctx
72- . get_source_indexing_context ( & flow_ctx . flow , source_idx, & pool)
72+ . get_source_indexing_context ( & flow, source_idx, & pool)
7373 . await ?;
7474
7575 let import_op = & plan. import_ops [ source_idx] ;
@@ -97,15 +97,9 @@ async fn update_source(
9797 delta
9898 } ;
9999 if options. print_stats {
100- println ! (
101- "{}.{}: {}" ,
102- flow_ctx. flow. flow_instance. name, import_op. name, delta
103- ) ;
100+ println ! ( "{}.{}: {}" , flow. flow_instance. name, import_op. name, delta) ;
104101 } else {
105- trace ! (
106- "{}.{}: {}" ,
107- flow_ctx. flow. flow_instance. name, import_op. name, delta
108- ) ;
102+ trace ! ( "{}.{}: {}" , flow. flow_instance. name, import_op. name, delta) ;
109103 }
110104 } ;
111105
@@ -219,14 +213,16 @@ impl FlowLiveUpdater {
219213 options : FlowLiveUpdaterOptions ,
220214 ) -> Result < Self > {
221215 let plan = flow_ctx. flow . get_execution_plan ( ) . await ?;
216+ let execution_ctx = Arc :: new ( flow_ctx. use_owned_execution_ctx ( ) . await ?) ;
222217
223218 let mut tasks = JoinSet :: new ( ) ;
224219 let sources_update_stats = ( 0 ..plan. import_ops . len ( ) )
225220 . map ( |source_idx| {
226221 let source_update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
227222 tasks. spawn ( update_source (
228- flow_ctx. clone ( ) ,
223+ flow_ctx. flow . clone ( ) ,
229224 plan. clone ( ) ,
225+ execution_ctx. clone ( ) ,
230226 source_update_stats. clone ( ) ,
231227 source_idx,
232228 pool. clone ( ) ,
0 commit comments