Skip to content

Commit bfbe83f

Browse files
afckclaude
andauthored
[testnet] Add missing app ID; submit task outcomes in the correct order. (#5425) (#5427)
Backport of #5425. ## Motivation Some apps may expect task outcomes to be submitted in the order in which they were requested. ## Proposal Order them correctly in the task processor. Also, add the application ID when rescheduling a query. ## Test Plan A test was added. ## Release Plan - Release a new SDK. ## Links - PR to main: #5425 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6c34d51 commit bfbe83f

File tree

4 files changed

+167
-23
lines changed

4 files changed

+167
-23
lines changed

examples/task-processor/src/service.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ impl QueryRoot {
7070
*self.state.task_count.get()
7171
}
7272

73+
/// Returns the stored results in order.
74+
async fn results(&self) -> Vec<String> {
75+
let count = self.state.results.count();
76+
self.state
77+
.results
78+
.read_front(count)
79+
.await
80+
.unwrap_or_default()
81+
}
82+
7383
/// Returns the pending tasks and callback requests for the task processor.
7484
async fn next_actions(
7585
&self,

linera-service/src/cli_wrappers/wallet.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,6 +1839,15 @@ impl<A> ApplicationWrapper<A> {
18391839
self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
18401840
.await
18411841
}
1842+
1843+
pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1844+
let mut out = String::from("mutation {\n");
1845+
for (index, mutation) in mutations.iter().enumerate() {
1846+
out = format!("{} u{}: {}\n", out, index, mutation);
1847+
}
1848+
out.push_str("}\n");
1849+
self.run_graphql_query(&out).await
1850+
}
18421851
}
18431852

18441853
impl<A> From<String> for ApplicationWrapper<A> {

linera-service/src/task_processor.rs

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ use std::{
1414
};
1515

1616
use async_graphql::InputType as _;
17-
use futures::{stream::StreamExt, FutureExt};
17+
use futures::{future, stream::StreamExt, FutureExt};
1818
use linera_base::{
1919
data_types::{TimeDelta, Timestamp},
2020
identifiers::{ApplicationId, ChainId},
2121
task_processor::{ProcessorActions, TaskOutcome},
2222
};
2323
use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
2424
use serde_json::json;
25-
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
25+
use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc, task::JoinHandle};
2626
use tokio_util::sync::CancellationToken;
2727
use tracing::{debug, error, info};
2828

@@ -56,6 +56,7 @@ pub struct TaskProcessor<Env: linera_core::Environment> {
5656
update_receiver: mpsc::UnboundedReceiver<Update>,
5757
deadlines: BinaryHeap<Deadline>,
5858
operators: OperatorMap,
59+
last_task_handles: BTreeMap<ApplicationId, JoinHandle<()>>,
5960
}
6061

6162
impl<Env: linera_core::Environment> TaskProcessor<Env> {
@@ -77,12 +78,13 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
7778
last_requested_callbacks: BTreeMap::new(),
7879
chain_client,
7980
cancellation_token,
81+
notifications,
8082
outcome_sender,
8183
outcome_receiver,
82-
notifications,
84+
update_receiver,
8385
deadlines: BinaryHeap::new(),
8486
operators,
85-
update_receiver,
87+
last_task_handles: BTreeMap::new(),
8688
}
8789
}
8890

@@ -144,12 +146,12 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
144146
self.application_ids = update.application_ids;
145147

146148
// Process actions for newly added applications
147-
let new_apps: Vec<_> = self
149+
let new_apps = self
148150
.application_ids
149151
.iter()
150152
.filter(|app_id| !old_app_set.contains(app_id))
151153
.cloned()
152-
.collect();
154+
.collect::<Vec<_>>();
153155
if !new_apps.is_empty() {
154156
self.process_actions(new_apps).await;
155157
}
@@ -188,7 +190,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
188190
// Retry in at most 1 minute.
189191
self.deadlines.push(Reverse((
190192
now.saturating_add(TimeDelta::from_secs(60)),
191-
None,
193+
Some(application_id),
192194
)));
193195
continue;
194196
}
@@ -198,22 +200,51 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
198200
self.deadlines
199201
.push(Reverse((timestamp, Some(application_id))));
200202
}
201-
for task in actions.execute_tasks {
203+
if !actions.execute_tasks.is_empty() {
202204
let sender = self.outcome_sender.clone();
203205
let operators = self.operators.clone();
204-
tokio::spawn(async move {
205-
if let Err(e) = Self::execute_task(
206-
application_id,
207-
task.operator,
208-
task.input,
209-
sender,
210-
operators,
211-
)
212-
.await
213-
{
214-
error!("Error executing task for {application_id}: {e}");
206+
let previous = self.last_task_handles.remove(&application_id);
207+
let handle = tokio::spawn(async move {
208+
// Spawn all tasks concurrently and join them.
209+
let handles: Vec<_> = actions
210+
.execute_tasks
211+
.into_iter()
212+
.map(|task| {
213+
let operators = operators.clone();
214+
tokio::spawn(Self::execute_task(
215+
application_id,
216+
task.operator,
217+
task.input,
218+
operators,
219+
))
220+
})
221+
.collect();
222+
let results = future::join_all(handles).await;
223+
// Wait for the previous batch to finish sending outcomes first.
224+
if let Some(previous) = previous {
225+
if let Err(error) = previous.await {
226+
error!(%application_id, %error, "Task panicked");
227+
}
228+
}
229+
// Submit outcomes in the original order.
230+
for result in results {
231+
match result {
232+
Ok(Ok(outcome)) => {
233+
if sender.send((application_id, outcome)).is_err() {
234+
error!("Outcome receiver dropped for {application_id}");
235+
break;
236+
}
237+
}
238+
Ok(Err(error)) => {
239+
error!(%application_id, %error, "Error executing task");
240+
}
241+
Err(error) => {
242+
error!(%application_id, %error, "Task panicked");
243+
}
244+
}
215245
}
216246
});
247+
self.last_task_handles.insert(application_id, handle);
217248
}
218249
}
219250
}
@@ -222,9 +253,8 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
222253
application_id: ApplicationId,
223254
operator: String,
224255
input: String,
225-
sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
226256
operators: OperatorMap,
227-
) -> Result<(), anyhow::Error> {
257+
) -> Result<TaskOutcome, anyhow::Error> {
228258
let binary_path = operators
229259
.get(&operator)
230260
.ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
@@ -250,8 +280,7 @@ impl<Env: linera_core::Environment> TaskProcessor<Env> {
250280
output: String::from_utf8_lossy(&output.stdout).into(),
251281
};
252282
debug!("Done executing task for {application_id}");
253-
sender.send((application_id, outcome))?;
254-
Ok(())
283+
Ok(outcome)
255284
}
256285

257286
async fn query_actions(

linera-service/tests/local_net_tests.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,6 +1389,102 @@ async fn test_node_service_with_task_processor() -> Result<()> {
13891389
Ok(())
13901390
}
13911391

1392+
/// Test that task processor outcomes are submitted in the order they were requested,
1393+
/// even when a later task finishes before an earlier one.
1394+
#[cfg(feature = "storage-service")]
1395+
#[test_log::test(tokio::test)]
1396+
async fn test_task_processor_outcome_ordering() -> Result<()> {
1397+
use std::{io::Write, os::unix::fs::PermissionsExt};
1398+
1399+
use linera_base::{abi::ContractAbi, identifiers::ApplicationId};
1400+
1401+
struct TaskProcessorAbi;
1402+
1403+
impl ContractAbi for TaskProcessorAbi {
1404+
type Operation = ();
1405+
type Response = ();
1406+
}
1407+
1408+
let _guard = INTEGRATION_TEST_GUARD.lock().await;
1409+
tracing::info!("Starting test {}", test_name!());
1410+
1411+
let config = LocalNetConfig::new_test(Database::Service, Network::Grpc);
1412+
let (mut net, client) = config.instantiate().await?;
1413+
let chain = client.load_wallet()?.default_chain().unwrap();
1414+
1415+
// Publish and create the task-processor example application.
1416+
let example_dir = ClientWrapper::example_path("task-processor")?;
1417+
let app_id_str = client
1418+
.project_publish(example_dir, vec![], None, &())
1419+
.await?;
1420+
let app_id: ApplicationId = app_id_str.trim().parse()?;
1421+
1422+
// Create a slow operator that sleeps before echoing its input.
1423+
let tmp_dir = tempfile::tempdir()?;
1424+
let slow_path = tmp_dir.path().join("slow-operator");
1425+
{
1426+
let mut file = std::fs::File::create(&slow_path)?;
1427+
writeln!(file, "#!/bin/sh")?;
1428+
writeln!(file, "sleep 1")?;
1429+
writeln!(file, "cat")?;
1430+
}
1431+
std::fs::set_permissions(&slow_path, std::fs::Permissions::from_mode(0o755))?;
1432+
1433+
// Create a fast operator that echoes immediately.
1434+
let fast_path = tmp_dir.path().join("fast-operator");
1435+
{
1436+
let mut file = std::fs::File::create(&fast_path)?;
1437+
writeln!(file, "#!/bin/sh")?;
1438+
writeln!(file, "cat")?;
1439+
}
1440+
std::fs::set_permissions(&fast_path, std::fs::Permissions::from_mode(0o755))?;
1441+
1442+
// Start the node service with both operators.
1443+
let port = get_node_port().await;
1444+
let operators = vec![
1445+
("slow".to_string(), slow_path),
1446+
("fast".to_string(), fast_path),
1447+
];
1448+
let mut node_service = client
1449+
.run_node_service_with_options(port, ProcessInbox::Skip, &[app_id], &operators, false)
1450+
.await?;
1451+
1452+
node_service.ensure_is_running()?;
1453+
1454+
// Subscribe to notifications for the chain.
1455+
let mut notifications = Box::pin(node_service.notifications(chain).await?);
1456+
1457+
let app = node_service.make_application(&chain, &app_id.with_abi::<TaskProcessorAbi>())?;
1458+
1459+
// Submit both tasks in a single block: slow first, then fast.
1460+
// The slow task takes longer but should have its result stored first.
1461+
app.multiple_mutate(&[
1462+
r#"requestTask(operator: "slow", input: "slow_result")"#.to_string(),
1463+
r#"requestTask(operator: "fast", input: "fast_result")"#.to_string(),
1464+
])
1465+
.await?;
1466+
1467+
// Wait for the block containing the RequestTask operations.
1468+
notifications.wait_for_block(None).await?;
1469+
1470+
// Wait for the two blocks containing the StoreResult operations.
1471+
notifications.wait_for_block(None).await?;
1472+
notifications.wait_for_block(None).await?;
1473+
1474+
let task_count: u64 = app.query_json("taskCount").await?;
1475+
assert_eq!(task_count, 2);
1476+
1477+
// Verify the results are in request order (slow first, fast second),
1478+
// not completion order (which would be fast first).
1479+
let results: Vec<String> = app.query_json("results").await?;
1480+
assert_eq!(results, vec!["slow_result", "fast_result"]);
1481+
1482+
net.ensure_is_running().await?;
1483+
net.terminate().await?;
1484+
1485+
Ok(())
1486+
}
1487+
13921488
/// Test that the node service read-only mode disables mutations and prevents query-triggered operations.
13931489
#[cfg(feature = "storage-service")]
13941490
#[test_log::test(tokio::test)]

0 commit comments

Comments
 (0)