@@ -22,7 +22,7 @@ use linera_base::{
2222} ;
2323use linera_core:: { client:: ChainClient , node:: NotificationStream , worker:: Reason } ;
2424use serde_json:: json;
25- use tokio:: { io:: AsyncWriteExt , process:: Command , select, sync:: mpsc, task :: JoinHandle } ;
25+ use tokio:: { io:: AsyncWriteExt , process:: Command , select, sync:: mpsc} ;
2626use tokio_util:: sync:: CancellationToken ;
2727use tracing:: { debug, error, info} ;
2828
@@ -43,6 +43,17 @@ pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
4343
4444type Deadline = Reverse < ( Timestamp , Option < ApplicationId > ) > ;
4545
46+ /// Messages sent from background task execution to the main loop.
47+ enum TaskMessage {
48+ /// A task outcome ready to be submitted.
49+ Outcome {
50+ application_id : ApplicationId ,
51+ outcome : TaskOutcome ,
52+ } ,
53+ /// All tasks in a batch have completed and their outcomes (if any) have been sent.
54+ BatchComplete { application_id : ApplicationId } ,
55+ }
56+
4657/// A task processor that watches applications and executes off-chain operators.
4758pub struct TaskProcessor < Env : linera_core:: Environment > {
4859 chain_id : ChainId ,
@@ -51,12 +62,12 @@ pub struct TaskProcessor<Env: linera_core::Environment> {
5162 chain_client : ChainClient < Env > ,
5263 cancellation_token : CancellationToken ,
5364 notifications : NotificationStream ,
54- outcome_sender : mpsc:: UnboundedSender < ( ApplicationId , TaskOutcome ) > ,
55- outcome_receiver : mpsc:: UnboundedReceiver < ( ApplicationId , TaskOutcome ) > ,
65+ outcome_sender : mpsc:: UnboundedSender < TaskMessage > ,
66+ outcome_receiver : mpsc:: UnboundedReceiver < TaskMessage > ,
5667 update_receiver : mpsc:: UnboundedReceiver < Update > ,
5768 deadlines : BinaryHeap < Deadline > ,
5869 operators : OperatorMap ,
59- last_task_handles : BTreeMap < ApplicationId , JoinHandle < ( ) > > ,
70+ in_flight_apps : BTreeSet < ApplicationId > ,
6071}
6172
6273impl < Env : linera_core:: Environment > TaskProcessor < Env > {
@@ -84,7 +95,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
8495 update_receiver,
8596 deadlines : BinaryHeap :: new ( ) ,
8697 operators,
87- last_task_handles : BTreeMap :: new ( ) ,
98+ in_flight_apps : BTreeSet :: new ( ) ,
8899 }
89100 }
90101
@@ -105,9 +116,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
105116 let application_ids = self . process_events( ) ;
106117 self . process_actions( application_ids) . await ;
107118 }
108- Some ( ( application_id, outcome) ) = self . outcome_receiver. recv( ) => {
109- if let Err ( e) = self . submit_task_outcome( application_id, & outcome) . await {
110- error!( "Error while processing task outcome {outcome:?}: {e}" ) ;
119+ Some ( msg) = self . outcome_receiver. recv( ) => {
120+ match msg {
121+ TaskMessage :: Outcome { application_id, outcome } => {
122+ if let Err ( e) = self . submit_task_outcome( application_id, & outcome) . await {
123+ error!( "Error while processing task outcome {outcome:?}: {e}" ) ;
124+ }
125+ }
126+ TaskMessage :: BatchComplete { application_id } => {
127+ self . in_flight_apps. remove( & application_id) ;
128+ self . process_actions( vec![ application_id] ) . await ;
129+ }
111130 }
112131 }
113132 Some ( update) = self . update_receiver. recv( ) => {
@@ -138,9 +157,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
138157 let new_app_set: BTreeSet < _ > = update. application_ids . iter ( ) . cloned ( ) . collect ( ) ;
139158 let old_app_set: BTreeSet < _ > = self . application_ids . iter ( ) . cloned ( ) . collect ( ) ;
140159
141- // Retain only last_requested_callbacks for applications that are still active
142160 self . cursors
143161 . retain ( |app_id, _| new_app_set. contains ( app_id) ) ;
162+ self . in_flight_apps
163+ . retain ( |app_id| new_app_set. contains ( app_id) ) ;
144164
145165 // Update the application_ids
146166 self . application_ids = update. application_ids ;
@@ -176,6 +196,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
176196
177197 async fn process_actions ( & mut self , application_ids : Vec < ApplicationId > ) {
178198 for application_id in application_ids {
199+ if self . in_flight_apps . contains ( & application_id) {
200+ debug ! ( "Skipping {application_id}: tasks already in flight" ) ;
201+ continue ;
202+ }
179203 debug ! ( "Processing actions for {application_id}" ) ;
180204 let now = Timestamp :: now ( ) ;
181205 let app_cursor = self . cursors . get ( & application_id) . cloned ( ) ;
@@ -199,10 +223,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
199223 self . cursors . insert ( application_id, cursor) ;
200224 }
201225 if !actions. execute_tasks . is_empty ( ) {
226+ self . in_flight_apps . insert ( application_id) ;
202227 let sender = self . outcome_sender . clone ( ) ;
203228 let operators = self . operators . clone ( ) ;
204- let previous = self . last_task_handles . remove ( & application_id) ;
205- let handle = tokio:: spawn ( async move {
229+ tokio:: spawn ( async move {
206230 // Spawn all tasks concurrently and join them.
207231 let handles: Vec < _ > = actions
208232 . execute_tasks
@@ -218,17 +242,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
218242 } )
219243 . collect ( ) ;
220244 let results = future:: join_all ( handles) . await ;
221- // Wait for the previous batch to finish sending outcomes first.
222- if let Some ( previous) = previous {
223- if let Err ( error) = previous. await {
224- error ! ( %application_id, %error, "Task panicked" ) ;
225- }
226- }
227245 // Submit outcomes in the original order.
228246 for result in results {
229247 match result {
230248 Ok ( Ok ( outcome) ) => {
231- if sender. send ( ( application_id, outcome) ) . is_err ( ) {
249+ if sender
250+ . send ( TaskMessage :: Outcome {
251+ application_id,
252+ outcome,
253+ } )
254+ . is_err ( )
255+ {
232256 error ! ( "Outcome receiver dropped for {application_id}" ) ;
233257 break ;
234258 }
@@ -241,8 +265,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
241265 }
242266 }
243267 }
268+ // Signal that this batch is done so the main loop can process
269+ // the next batch for this application.
270+ let _ = sender. send ( TaskMessage :: BatchComplete { application_id } ) ;
244271 } ) ;
245- self . last_task_handles . insert ( application_id, handle) ;
246272 }
247273 }
248274 }
0 commit comments