From 3103e19dda9dec913c129035715285c83c2618d4 Mon Sep 17 00:00:00 2001 From: itowlson Date: Wed, 12 Nov 2025 11:15:12 +1300 Subject: [PATCH] Concurrent outbound calls: fancier, and also schmancier Signed-off-by: itowlson --- .../Cargo.lock | 19 +-- .../Cargo.toml | 3 + .../src/lib.rs | 109 ++++++++++++++++-- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/examples/wasip3-concurrent-outbound-http-calls/Cargo.lock b/examples/wasip3-concurrent-outbound-http-calls/Cargo.lock index ab2e228..3c0753b 100644 --- a/examples/wasip3-concurrent-outbound-http-calls/Cargo.lock +++ b/examples/wasip3-concurrent-outbound-http-calls/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "android_system_properties" @@ -113,8 +113,11 @@ name = "concurrent-outbound-http-calls" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "futures", "http", + "http-body", + "http-body-util", "spin-sdk", ] @@ -729,7 +732,7 @@ dependencies = [ [[package]] name = "spin-executor" -version = "5.0.0" +version = "5.1.1" dependencies = [ "futures", "once_cell", @@ -738,7 +741,7 @@ dependencies = [ [[package]] name = "spin-macro" -version = "5.0.0" +version = "5.1.1" dependencies = [ "anyhow", "bytes", @@ -749,7 +752,7 @@ dependencies = [ [[package]] name = "spin-sdk" -version = "5.0.0" +version = "5.1.1" dependencies = [ "anyhow", "async-trait", @@ -776,7 +779,7 @@ dependencies = [ [[package]] name = "spin-wasip3-http" -version = "5.0.0" +version = "5.1.1" dependencies = [ "anyhow", "bytes", @@ -788,7 +791,7 @@ dependencies = [ [[package]] name = "spin-wasip3-http-macro" -version = "5.0.0" +version = "5.1.1" dependencies = [ "proc-macro2", "quote", @@ -950,9 +953,9 @@ dependencies = [ [[package]] name = "wasip3" -version = "0.2.1+wasi-0.3.0-rc-2025-09-16" +version = "0.2.2+wasi-0.3.0-rc-2025-09-16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb2796323e2357ae2d4ba2b781a0392b533f40a5b9f534eef49b23e54186d64" +checksum = "f6c63a3b17ddf12b276e57eb36f9546a239563f6c1f00dac5dfa92530fd7866a" dependencies = [ "bytes", "http", diff --git a/examples/wasip3-concurrent-outbound-http-calls/Cargo.toml b/examples/wasip3-concurrent-outbound-http-calls/Cargo.toml index 28bc248..20c7db9 100644 --- a/examples/wasip3-concurrent-outbound-http-calls/Cargo.toml +++ b/examples/wasip3-concurrent-outbound-http-calls/Cargo.toml @@ -11,8 +11,11 @@ crate-type = ["cdylib"] [dependencies] anyhow = "1" +bytes = "1.10.1" futures = "0.3.31" http = "1.3" +http-body = "1" +http-body-util = "0.1.3" spin-sdk = { path = "../..", features = ["wasip3-unstable"] } [workspace] diff --git a/examples/wasip3-concurrent-outbound-http-calls/src/lib.rs b/examples/wasip3-concurrent-outbound-http-calls/src/lib.rs index 46be3aa..16f0f97 100644 --- a/examples/wasip3-concurrent-outbound-http-calls/src/lib.rs +++ b/examples/wasip3-concurrent-outbound-http-calls/src/lib.rs @@ -1,30 +1,117 @@ use std::pin::pin; +use std::time::{Duration, Instant}; -use futures::future::Either; +use bytes::Bytes; +use futures::{ + channel::mpsc::{channel, Sender}, + SinkExt, StreamExt, +}; use http::Request; -use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; use spin_sdk::http_wasip3::http_service; +use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; +// In this streaming scenario, the entry point is a shim +// which kicks off the main async work of the application as +// a `spawn` and then immediately returns a Response. The response +// content will continue streaming from the "main application" +// function despite the entry point having returned. #[http_service] async fn handle_concurrent_outbound_http_calls(_req: spin_sdk::http_wasip3::Request) -> anyhow::Result { + // Create a streaming Body implementation that backs onto a `mpsc` + // channel. The function returns the sender side of the channel; the + // receiver end becomes the body. So anything written to the sender + // side will be sent out over the HTTP response. + let (tx, body) = bytes_stream_body(); + + // Spawn a task to run the application logic and stream the results + // to the client. `spawn` continues to run this future even after the + // function has exited with the return of the Response object. + spin_sdk::http_wasip3::wasip3::wit_bindgen::spawn( + handle_concurrent_outbound_http_calls_impl(tx) + ); + + Ok(http::Response::new(body)) +} +// This is the real body of the application! Here `tx` is the +// sender through which we stream data to the client. +async fn handle_concurrent_outbound_http_calls_impl(mut tx: Sender) { + // Start two async tasks to make concurrent outbound requests. let spin = pin!(get_content_length("https://spinframework.dev")); let book = pin!(get_content_length("https://component-model.bytecodealliance.org/")); - let (first, len) = match futures::future::select(spin, book).await { - Either::Left(len) => ("Spin docs", len), - Either::Right(len) => ("Component model book", len), - }; + // `select` completes when the first task completes. + let first_completion = futures::future::select(spin, book).await; + + // Retrieve the result of whichever task completed, retaining the other + // task for later use. + let (first_result, second_fut) = first_completion.factor_first(); + + // Write the outcome of that first task to the response. + let first_message = first_result.unwrap().as_message("first"); + tx.send(Bytes::from(first_message)).await.unwrap(); + + // Await the second task... + let second_result = second_fut.await; - let response = format!("{first} site was first response with content-length {:?}\n", len.0?); + // ...and write its result to the response too. + let second_message = second_result.unwrap().as_message("second"); + tx.send(Bytes::from(second_message)).await.unwrap(); - Ok(response) + // The `tx` sender drops at the end of the function, which ends the + // response stream: if you need to close it explicitly in order to + // continue doing work after completing the response, you can use `tx.close_channel()`. } -async fn get_content_length(url: &str) -> anyhow::Result> { +struct TaskResult { + url: String, + time_taken: Duration, + content_length: Option, +} + +impl TaskResult { + fn as_message(&self, position: &str) -> String { + format!( + "{} was {position} with a content-length of {:?} in {}ms\n", + self.url, + self.content_length, + self.time_taken.as_millis() + ) + } +} + +async fn get_content_length(url: &str) -> anyhow::Result { let request = Request::get(url).body(EmptyBody::new())?; + let sent_at = Instant::now(); let response = send(request).await?; + let time_taken = Instant::now().duration_since(sent_at); let cl_header = response.headers().get("content-length"); - let cl = cl_header.and_then(|hval| hval.to_str().ok()).and_then(|hval| hval.parse().ok()); - Ok(cl) + let content_length = cl_header + .and_then(|hval| hval.to_str().ok()) + .and_then(|hval| hval.parse().ok()); + + Ok(TaskResult { + url: url.to_string(), + time_taken, + content_length, + }) +} + +// Helper function to create a streaming body. +fn bytes_stream_body() -> ( + Sender, + impl http_body::Body, +) { + // The send and receive sides of a channel + let (tx, rx) = channel::(1024); + // The receive side is a stream, so we can use combinators like `map` + // to transform it into a form that the response plumbing is happy + // with. The app logic that writes to the stream doesn't need to see + // any of this. + let stm = rx.map(|value| Ok(http_body::Frame::data(value))); + // Construct a Body implementation over the stream. + let body = http_body_util::StreamBody::new(stm); + // Return the send side (so that app logic can write to the body) and the + // body (so it can be put in a Response!). + (tx, body) }