Skip to content

Commit e56f3fe

Browse files
authored
Fix duplicate task generation (#5431)
## Motivation The task processor can call `process_actions` multiple times while a previous batch's tasks are still in-flight. When there's a backlog (`schedule.start` far in the past), the deadline timer fires immediately because the next event is also in the past. The `last_requested_callbacks` guard passes, and `next_actions` reads stale on-chain state (schedule.start hasn't advanced yet), generating overlapping timestamps. When both batches are eventually submitted as blocks, batch 2's stale timestamps don't match the now-advanced `schedule.start`, causing: ``` contract.rs:115: assert_eq!(timed_data.timestamp, next_timestamp, "unexpected timestamp in posted data") ``` ## Proposal Introduce an `in_flight_apps: BTreeSet<ApplicationId>` guard in the task processor. When a batch of tasks is spawned for an application, the app is marked as in-flight and subsequent `process_actions` calls skip it. A new `TaskMessage` enum replaces the raw `(ApplicationId, TaskOutcome)` channel, adding a `BatchComplete` variant that the spawned task sends after all outcomes. On `BatchComplete`, the guard is cleared and `process_actions` is re-triggered for that app so it reads freshly-updated on-chain state. ## Test Plan - CI - I had a stuck worker with a big backlog. After rebuilding pm-app with this fix and restarting the worker with the new package version, I see no more panics, and it seems that the worker is actually making progress through the backlog: ![Screenshot 2026-02-12 at 13.44.18.png](https://app.graphite.com/user-attachments/assets/be611bf2-3b7a-4bbd-bda2-45b1e5049108.png)
1 parent daaab9f commit e56f3fe

File tree

1 file changed

+46
-19
lines changed

1 file changed

+46
-19
lines changed

linera-service/src/task_processor.rs

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use linera_base::{
2222
};
2323
use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
2424
use serde_json::json;
25-
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc, task::JoinHandle};
25+
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
2626
use tokio_util::sync::CancellationToken;
2727
use tracing::{debug, error, info};
2828

@@ -43,6 +43,17 @@ pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
4343

4444
type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
4545

46+
/// Messages sent from background task execution to the main loop.
47+
enum TaskMessage {
48+
/// A task outcome ready to be submitted.
49+
Outcome {
50+
application_id: ApplicationId,
51+
outcome: TaskOutcome,
52+
},
53+
/// All tasks in a batch have completed and their outcomes (if any) have been sent.
54+
BatchComplete { application_id: ApplicationId },
55+
}
56+
4657
/// A task processor that watches applications and executes off-chain operators.
4758
pub struct TaskProcessor<Env: linera_core::Environment> {
4859
chain_id: ChainId,
@@ -51,12 +62,12 @@ pub struct TaskProcessor<Env: linera_core::Environment> {
5162
chain_client: ChainClient<Env>,
5263
cancellation_token: CancellationToken,
5364
notifications: NotificationStream,
54-
outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
55-
outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
65+
outcome_sender: mpsc::UnboundedSender<TaskMessage>,
66+
outcome_receiver: mpsc::UnboundedReceiver<TaskMessage>,
5667
update_receiver: mpsc::UnboundedReceiver<Update>,
5768
deadlines: BinaryHeap<Deadline>,
5869
operators: OperatorMap,
59-
last_task_handles: BTreeMap<ApplicationId, JoinHandle<()>>,
70+
in_flight_apps: BTreeSet<ApplicationId>,
6071
}
6172

6273
impl<Env: linera_core::Environment> TaskProcessor<Env> {
@@ -84,7 +95,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
8495
update_receiver,
8596
deadlines: BinaryHeap::new(),
8697
operators,
87-
last_task_handles: BTreeMap::new(),
98+
in_flight_apps: BTreeSet::new(),
8899
}
89100
}
90101

@@ -105,9 +116,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
105116
let application_ids = self.process_events();
106117
self.process_actions(application_ids).await;
107118
}
108-
Some((application_id, outcome)) = self.outcome_receiver.recv() => {
109-
if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
110-
error!("Error while processing task outcome {outcome:?}: {e}");
119+
Some(msg) = self.outcome_receiver.recv() => {
120+
match msg {
121+
TaskMessage::Outcome { application_id, outcome } => {
122+
if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
123+
error!("Error while processing task outcome {outcome:?}: {e}");
124+
}
125+
}
126+
TaskMessage::BatchComplete { application_id } => {
127+
self.in_flight_apps.remove(&application_id);
128+
self.process_actions(vec![application_id]).await;
129+
}
111130
}
112131
}
113132
Some(update) = self.update_receiver.recv() => {
@@ -138,9 +157,11 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
138157
let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
139158
let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
140159

141-
// Retain only last_requested_callbacks for applications that are still active
160+
// Retain only last_requested_callbacks and in_flight_apps for applications that are still active
142161
self.last_requested_callbacks
143162
.retain(|app_id, _| new_app_set.contains(app_id));
163+
self.in_flight_apps
164+
.retain(|app_id| new_app_set.contains(app_id));
144165

145166
// Update the application_ids
146167
self.application_ids = update.application_ids;
@@ -176,6 +197,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
176197

177198
async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
178199
for application_id in application_ids {
200+
if self.in_flight_apps.contains(&application_id) {
201+
debug!("Skipping {application_id}: tasks already in flight");
202+
continue;
203+
}
179204
debug!("Processing actions for {application_id}");
180205
let now = Timestamp::now();
181206
let last_requested_callback =
@@ -201,10 +226,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
201226
.push(Reverse((timestamp, Some(application_id))));
202227
}
203228
if !actions.execute_tasks.is_empty() {
229+
self.in_flight_apps.insert(application_id);
204230
let sender = self.outcome_sender.clone();
205231
let operators = self.operators.clone();
206-
let previous = self.last_task_handles.remove(&application_id);
207-
let handle = tokio::spawn(async move {
232+
tokio::spawn(async move {
208233
// Spawn all tasks concurrently and join them.
209234
let handles: Vec<_> = actions
210235
.execute_tasks
@@ -220,17 +245,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
220245
})
221246
.collect();
222247
let results = future::join_all(handles).await;
223-
// Wait for the previous batch to finish sending outcomes first.
224-
if let Some(previous) = previous {
225-
if let Err(error) = previous.await {
226-
error!(%application_id, %error, "Task panicked");
227-
}
228-
}
229248
// Submit outcomes in the original order.
230249
for result in results {
231250
match result {
232251
Ok(Ok(outcome)) => {
233-
if sender.send((application_id, outcome)).is_err() {
252+
if sender
253+
.send(TaskMessage::Outcome {
254+
application_id,
255+
outcome,
256+
})
257+
.is_err()
258+
{
234259
error!("Outcome receiver dropped for {application_id}");
235260
break;
236261
}
@@ -243,8 +268,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
243268
}
244269
}
245270
}
271+
// Signal that this batch is done so the main loop can process
272+
// the next batch for this application.
273+
let _ = sender.send(TaskMessage::BatchComplete { application_id });
246274
});
247-
self.last_task_handles.insert(application_id, handle);
248275
}
249276
}
250277
}

0 commit comments

Comments
 (0)