|
1 | 1 | use std::pin::pin; |
| 2 | +use std::time::{Duration, Instant}; |
2 | 3 |
|
3 | | -use futures::future::Either; |
| 4 | +use bytes::Bytes; |
| 5 | +use futures::{ |
| 6 | + channel::mpsc::{channel, Sender}, |
| 7 | + SinkExt, StreamExt, |
| 8 | +}; |
4 | 9 | use http::Request; |
5 | | -use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; |
6 | 10 | use spin_sdk::http_wasip3::http_service; |
| 11 | +use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; |
7 | 12 |
|
| 13 | +// In this streaming scenario, the entry point is a shim |
| 14 | +// which kicks off the main async work of the application as |
| 15 | +// a `spawn` and then immediately returns a Response. The response |
| 16 | +// content will continue streaming from the "main application" |
| 17 | +// function despite the entry point having returned. |
8 | 18 | #[http_service] |
9 | 19 | async fn handle_concurrent_outbound_http_calls(_req: spin_sdk::http_wasip3::Request) -> anyhow::Result<impl IntoResponse> { |
| 20 | + // Create a streaming Body implementation that backs onto a `mpsc` |
| 21 | + // channel. The function returns the sender side of the channel; the |
| 22 | + // receiver end becomes the body. So anything written to the sender |
| 23 | + // side will be sent out over the HTTP response. |
| 24 | + let (tx, body) = bytes_stream_body(); |
| 25 | + |
| 26 | + // Spawn a task to run the application logic and stream the results |
| 27 | + // to the client. `spawn` continues to run this future even after the |
| 28 | + // function has exited with the return of the Response object. |
| 29 | + spin_sdk::http_wasip3::wasip3::wit_bindgen::spawn( |
| 30 | + handle_concurrent_outbound_http_calls_impl(tx) |
| 31 | + ); |
| 32 | + |
| 33 | + Ok(http::Response::new(body)) |
| 34 | +} |
10 | 35 |
|
| 36 | +// This is the real body of the application! Here `tx` is the |
| 37 | +// sender through which we stream data to the client. |
| 38 | +async fn handle_concurrent_outbound_http_calls_impl(mut tx: Sender<Bytes>) { |
| 39 | + // Start two async tasks to make concurrent outbound requests. |
11 | 40 | let spin = pin!(get_content_length("https://spinframework.dev")); |
12 | 41 | let book = pin!(get_content_length("https://component-model.bytecodealliance.org/")); |
13 | 42 |
|
14 | | - let (first, len) = match futures::future::select(spin, book).await { |
15 | | - Either::Left(len) => ("Spin docs", len), |
16 | | - Either::Right(len) => ("Component model book", len), |
17 | | - }; |
| 43 | + // `select` completes when the first task completes. |
| 44 | + let first_completion = futures::future::select(spin, book).await; |
| 45 | + |
| 46 | + // Retrieve the result of whichever task completed, retaining the other |
| 47 | + // task for later use. |
| 48 | + let (first_result, second_fut) = first_completion.factor_first(); |
| 49 | + |
| 50 | + // Write the outcome of that first task to the response. |
| 51 | + let first_message = first_result.unwrap().as_message("first"); |
| 52 | + tx.send(Bytes::from(first_message)).await.unwrap(); |
| 53 | + |
| 54 | + // Await the second task... |
| 55 | + let second_result = second_fut.await; |
18 | 56 |
|
19 | | - let response = format!("{first} site was first response with content-length {:?}\n", len.0?); |
| 57 | + // ...and write its result to the response too. |
| 58 | + let second_message = second_result.unwrap().as_message("second"); |
| 59 | + tx.send(Bytes::from(second_message)).await.unwrap(); |
20 | 60 |
|
21 | | - Ok(response) |
| 61 | + // The `tx` sender drops at the end of the function, which ends the |
| 62 | + // response stream: if you need to close it explicitly in order to |
| 63 | + // continue doing work after completing the response, you can use `tx.close_channel()`. |
22 | 64 | } |
23 | 65 |
|
24 | | -async fn get_content_length(url: &str) -> anyhow::Result<Option<u64>> { |
| 66 | +struct TaskResult { |
| 67 | + url: String, |
| 68 | + time_taken: Duration, |
| 69 | + content_length: Option<usize>, |
| 70 | +} |
| 71 | + |
| 72 | +impl TaskResult { |
| 73 | + fn as_message(&self, position: &str) -> String { |
| 74 | + format!( |
| 75 | + "{} was {position} with a content-length of {:?} in {}ms\n", |
| 76 | + self.url, |
| 77 | + self.content_length, |
| 78 | + self.time_taken.as_millis() |
| 79 | + ) |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +async fn get_content_length(url: &str) -> anyhow::Result<TaskResult> { |
25 | 84 | let request = Request::get(url).body(EmptyBody::new())?; |
| 85 | + let sent_at = Instant::now(); |
26 | 86 | let response = send(request).await?; |
| 87 | + let time_taken = Instant::now().duration_since(sent_at); |
27 | 88 | let cl_header = response.headers().get("content-length"); |
28 | | - let cl = cl_header.and_then(|hval| hval.to_str().ok()).and_then(|hval| hval.parse().ok()); |
29 | | - Ok(cl) |
| 89 | + let content_length = cl_header |
| 90 | + .and_then(|hval| hval.to_str().ok()) |
| 91 | + .and_then(|hval| hval.parse().ok()); |
| 92 | + |
| 93 | + Ok(TaskResult { |
| 94 | + url: url.to_string(), |
| 95 | + time_taken, |
| 96 | + content_length, |
| 97 | + }) |
| 98 | +} |
| 99 | + |
| 100 | +// Helper function to create a streaming body. |
| 101 | +fn bytes_stream_body() -> ( |
| 102 | + Sender<bytes::Bytes>, |
| 103 | + impl http_body::Body<Data = Bytes, Error = anyhow::Error>, |
| 104 | +) { |
| 105 | + // The send and receive sides of a channel |
| 106 | + let (tx, rx) = channel::<Bytes>(1024); |
| 107 | + // The receive side is a stream, so we can use combinators like `map` |
| 108 | + // to transform it into a form that the response plumbing is happy |
| 109 | + // with. The app logic that writes to the stream doesn't need to see |
| 110 | + // any of this. |
| 111 | + let stm = rx.map(|value| Ok(http_body::Frame::data(value))); |
| 112 | + // Construct a Body implementation over the stream. |
| 113 | + let body = http_body_util::StreamBody::new(stm); |
| 114 | + // Return the send side (so that app logic can write to the body) and the |
| 115 | + // body (so it can be put in a Response!). |
| 116 | + (tx, body) |
30 | 117 | } |
0 commit comments