Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions linera-service/src/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use linera_base::{
};
use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
use serde_json::json;
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc, task::JoinHandle};
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

Expand All @@ -43,6 +43,17 @@ pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {

type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;

/// Messages sent from background task execution to the main loop.
enum TaskMessage {
/// A task outcome ready to be submitted.
Outcome {
application_id: ApplicationId,
outcome: TaskOutcome,
},
/// All tasks in a batch have completed and their outcomes (if any) have been sent.
BatchComplete { application_id: ApplicationId },
}

/// A task processor that watches applications and executes off-chain operators.
pub struct TaskProcessor<Env: linera_core::Environment> {
chain_id: ChainId,
Expand All @@ -51,12 +62,12 @@ pub struct TaskProcessor<Env: linera_core::Environment> {
chain_client: ChainClient<Env>,
cancellation_token: CancellationToken,
notifications: NotificationStream,
outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
outcome_sender: mpsc::UnboundedSender<TaskMessage>,
outcome_receiver: mpsc::UnboundedReceiver<TaskMessage>,
update_receiver: mpsc::UnboundedReceiver<Update>,
deadlines: BinaryHeap<Deadline>,
operators: OperatorMap,
last_task_handles: BTreeMap<ApplicationId, JoinHandle<()>>,
in_flight_apps: BTreeSet<ApplicationId>,
}

impl<Env: linera_core::Environment> TaskProcessor<Env> {
Expand Down Expand Up @@ -84,7 +95,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
update_receiver,
deadlines: BinaryHeap::new(),
operators,
last_task_handles: BTreeMap::new(),
in_flight_apps: BTreeSet::new(),
}
}

Expand All @@ -105,9 +116,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
let application_ids = self.process_events();
self.process_actions(application_ids).await;
}
Some((application_id, outcome)) = self.outcome_receiver.recv() => {
if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
error!("Error while processing task outcome {outcome:?}: {e}");
Some(msg) = self.outcome_receiver.recv() => {
match msg {
TaskMessage::Outcome { application_id, outcome } => {
if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
error!("Error while processing task outcome {outcome:?}: {e}");
}
}
TaskMessage::BatchComplete { application_id } => {
self.in_flight_apps.remove(&application_id);
self.process_actions(vec![application_id]).await;
}
}
}
Some(update) = self.update_receiver.recv() => {
Expand Down Expand Up @@ -138,9 +157,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();

// Retain only last_requested_callbacks for applications that are still active
self.cursors
.retain(|app_id, _| new_app_set.contains(app_id));
self.in_flight_apps
.retain(|app_id| new_app_set.contains(app_id));

// Update the application_ids
self.application_ids = update.application_ids;
Expand Down Expand Up @@ -176,6 +196,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {

async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
for application_id in application_ids {
if self.in_flight_apps.contains(&application_id) {
debug!("Skipping {application_id}: tasks already in flight");
continue;
}
debug!("Processing actions for {application_id}");
let now = Timestamp::now();
let app_cursor = self.cursors.get(&application_id).cloned();
Expand All @@ -199,10 +223,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
self.cursors.insert(application_id, cursor);
}
if !actions.execute_tasks.is_empty() {
self.in_flight_apps.insert(application_id);
let sender = self.outcome_sender.clone();
let operators = self.operators.clone();
let previous = self.last_task_handles.remove(&application_id);
let handle = tokio::spawn(async move {
tokio::spawn(async move {
// Spawn all tasks concurrently and join them.
let handles: Vec<_> = actions
.execute_tasks
Expand All @@ -218,17 +242,17 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
})
.collect();
let results = future::join_all(handles).await;
// Wait for the previous batch to finish sending outcomes first.
if let Some(previous) = previous {
if let Err(error) = previous.await {
error!(%application_id, %error, "Task panicked");
}
}
// Submit outcomes in the original order.
for result in results {
match result {
Ok(Ok(outcome)) => {
if sender.send((application_id, outcome)).is_err() {
if sender
.send(TaskMessage::Outcome {
application_id,
outcome,
})
.is_err()
{
error!("Outcome receiver dropped for {application_id}");
break;
}
Expand All @@ -241,8 +265,10 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
}
}
}
// Signal that this batch is done so the main loop can process
// the next batch for this application.
let _ = sender.send(TaskMessage::BatchComplete { application_id });
});
self.last_task_handles.insert(application_id, handle);
}
}
}
Expand Down
Loading