@@ -22,7 +22,7 @@ use linera_base::{
2222} ;
2323use linera_core:: { client:: ChainClient , node:: NotificationStream , worker:: Reason } ;
2424use serde_json:: json;
25- use tokio:: { io:: AsyncWriteExt , process:: Command , select, sync:: mpsc, task :: JoinHandle } ;
25+ use tokio:: { io:: AsyncWriteExt , process:: Command , select, sync:: mpsc} ;
2626use tokio_util:: sync:: CancellationToken ;
2727use tracing:: { debug, error, info} ;
2828
@@ -43,6 +43,17 @@ pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
4343
4444type Deadline = Reverse < ( Timestamp , Option < ApplicationId > ) > ;
4545
46+ /// Messages sent from background task execution to the main loop.
47+ enum TaskMessage {
48+ /// A task outcome ready to be submitted.
49+ Outcome {
50+ application_id : ApplicationId ,
51+ outcome : TaskOutcome ,
52+ } ,
53+ /// All tasks in a batch have completed and their outcomes (if any) have been sent.
54+ BatchComplete { application_id : ApplicationId } ,
55+ }
56+
4657/// A task processor that watches applications and executes off-chain operators.
4758pub struct TaskProcessor < Env : linera_core:: Environment > {
4859 chain_id : ChainId ,
@@ -51,12 +62,12 @@ pub struct TaskProcessor<Env: linera_core::Environment> {
5162 chain_client : ChainClient < Env > ,
5263 cancellation_token : CancellationToken ,
5364 notifications : NotificationStream ,
54- outcome_sender : mpsc:: UnboundedSender < ( ApplicationId , TaskOutcome ) > ,
55- outcome_receiver : mpsc:: UnboundedReceiver < ( ApplicationId , TaskOutcome ) > ,
65+ outcome_sender : mpsc:: UnboundedSender < TaskMessage > ,
66+ outcome_receiver : mpsc:: UnboundedReceiver < TaskMessage > ,
5667 update_receiver : mpsc:: UnboundedReceiver < Update > ,
5768 deadlines : BinaryHeap < Deadline > ,
5869 operators : OperatorMap ,
59- last_task_handles : BTreeMap < ApplicationId , JoinHandle < ( ) > > ,
70+ in_flight_apps : BTreeSet < ApplicationId > ,
6071}
6172
6273impl < Env : linera_core:: Environment > TaskProcessor < Env > {
@@ -84,7 +95,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
8495 update_receiver,
8596 deadlines : BinaryHeap :: new ( ) ,
8697 operators,
87- last_task_handles : BTreeMap :: new ( ) ,
98+ in_flight_apps : BTreeSet :: new ( ) ,
8899 }
89100 }
90101
@@ -105,9 +116,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
105116 let application_ids = self . process_events( ) ;
106117 self . process_actions( application_ids) . await ;
107118 }
108- Some ( ( application_id, outcome) ) = self . outcome_receiver. recv( ) => {
109- if let Err ( e) = self . submit_task_outcome( application_id, & outcome) . await {
110- error!( "Error while processing task outcome {outcome:?}: {e}" ) ;
119+ Some ( msg) = self . outcome_receiver. recv( ) => {
120+ match msg {
121+ TaskMessage :: Outcome { application_id, outcome } => {
122+ if let Err ( e) = self . submit_task_outcome( application_id, & outcome) . await {
123+ error!( "Error while processing task outcome {outcome:?}: {e}" ) ;
124+ }
125+ }
126+ TaskMessage :: BatchComplete { application_id } => {
127+ self . in_flight_apps. remove( & application_id) ;
128+ self . process_actions( vec![ application_id] ) . await ;
129+ }
111130 }
112131 }
113132 Some ( update) = self . update_receiver. recv( ) => {
@@ -138,9 +157,11 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
138157 let new_app_set: BTreeSet < _ > = update. application_ids . iter ( ) . cloned ( ) . collect ( ) ;
139158 let old_app_set: BTreeSet < _ > = self . application_ids . iter ( ) . cloned ( ) . collect ( ) ;
140159
141- // Retain only last_requested_callbacks for applications that are still active
160+ // Retain only last_requested_callbacks and in_flight_apps for applications that are still active
142161 self . last_requested_callbacks
143162 . retain ( |app_id, _| new_app_set. contains ( app_id) ) ;
163+ self . in_flight_apps
164+ . retain ( |app_id| new_app_set. contains ( app_id) ) ;
144165
145166 // Update the application_ids
146167 self . application_ids = update. application_ids ;
@@ -176,6 +197,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
176197
177198 async fn process_actions ( & mut self , application_ids : Vec < ApplicationId > ) {
178199 for application_id in application_ids {
200+ if self . in_flight_apps . contains ( & application_id) {
201+ debug ! ( "Skipping {application_id}: tasks already in flight" ) ;
202+ continue ;
203+ }
179204 debug ! ( "Processing actions for {application_id}" ) ;
180205 let now = Timestamp :: now ( ) ;
181206 let last_requested_callback =
@@ -201,10 +226,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
201226 . push ( Reverse ( ( timestamp, Some ( application_id) ) ) ) ;
202227 }
203228 if !actions. execute_tasks . is_empty ( ) {
229+ self . in_flight_apps . insert ( application_id) ;
204230 let sender = self . outcome_sender . clone ( ) ;
205231 let operators = self . operators . clone ( ) ;
206- let previous = self . last_task_handles . remove ( & application_id) ;
207- let handle = tokio:: spawn ( async move {
232+ tokio:: spawn ( async move {
208233 // Spawn all tasks concurrently and join them.
209234 let handles: Vec < _ > = actions
210235 . execute_tasks
@@ -220,17 +245,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
220245 } )
221246 . collect ( ) ;
222247 let results = future:: join_all ( handles) . await ;
223- // Wait for the previous batch to finish sending outcomes first.
224- if let Some ( previous) = previous {
225- if let Err ( error) = previous. await {
226- error ! ( %application_id, %error, "Task panicked" ) ;
227- }
228- }
229248 // Submit outcomes in the original order.
230249 for result in results {
231250 match result {
232251 Ok ( Ok ( outcome) ) => {
233- if sender. send ( ( application_id, outcome) ) . is_err ( ) {
252+ if sender
253+ . send ( TaskMessage :: Outcome {
254+ application_id,
255+ outcome,
256+ } )
257+ . is_err ( )
258+ {
234259 error ! ( "Outcome receiver dropped for {application_id}" ) ;
235260 break ;
236261 }
@@ -243,8 +268,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
243268 }
244269 }
245270 }
271+ // Signal that this batch is done so the main loop can process
272+ // the next batch for this application.
273+ let _ = sender. send ( TaskMessage :: BatchComplete { application_id } ) ;
246274 } ) ;
247- self . last_task_handles . insert ( application_id, handle) ;
248275 }
249276 }
250277 }
0 commit comments