|
1 | 1 | use std::pin::pin; |
2 | 2 |
|
3 | | -use futures::future::Either; |
| 3 | +use futures::SinkExt; |
4 | 4 | use http::Request; |
5 | | -use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; |
| 5 | +use serde::{Deserialize, Serialize}; |
6 | 6 | use spin_sdk::http_wasip3::http_service; |
| 7 | +use spin_sdk::http_wasip3::{send, EmptyBody, IntoResponse}; |
| 8 | + |
| 9 | +use bytes::Bytes; |
| 10 | +use futures::{ |
| 11 | + channel::mpsc::{channel, Sender}, |
| 12 | + StreamExt, |
| 13 | +}; |
7 | 14 |
|
8 | 15 | #[http_service] |
9 | | -async fn handle_concurrent_outbound_http_calls(_req: spin_sdk::http_wasip3::Request) -> anyhow::Result<impl IntoResponse> { |
| 16 | +async fn handle_competing_outbound_http_calls( |
| 17 | + _req: spin_sdk::http_wasip3::Request, |
| 18 | +) -> impl IntoResponse { |
| 19 | + println!("Handling reuqest"); |
| 20 | + |
| 21 | + // A lot of code taken from: https://github.com/spinframework/spin-rust-sdk/blob/main/examples/wasip3-streaming/src/lib.rs |
| 22 | + |
| 23 | + // Create a streaming Body implementation that backs onto a `mpsc` |
| 24 | + // channel. The function returns the sender side of the channel; the |
| 25 | + // receiver end becomes the body. So anything written to the sender |
| 26 | + // side will be sent out over the HTTP response. |
| 27 | + let (mut tx, body) = bytes_stream_body(); |
| 28 | + |
| 29 | + // Use wit_bindgen::spawn to allow the async block to keep running |
| 30 | + // after the handler returns. |
| 31 | + spin_sdk::http_wasip3::wasip3::wit_bindgen::spawn(async move { |
| 32 | + // The two outbound calls |
| 33 | + let spin = pin!(get_content_length("https://spinframework.dev")); |
| 34 | + let book = pin!(get_content_length( |
| 35 | + "https://component-model.bytecodealliance.org" |
| 36 | + )); |
| 37 | + |
| 38 | + // Getting the first response back |
| 39 | + let first_result = futures::future::select(spin, book).await.factor_first(); |
| 40 | + |
| 41 | + // We need to keep the future around to also handle the second response, |
| 42 | + // hence instantiating a new variable for the ContentLength struct |
| 43 | + let first_response = first_result.0.unwrap(); |
10 | 44 |
|
11 | | - let spin = pin!(get_content_length("https://spinframework.dev")); |
12 | | - let book = pin!(get_content_length("https://component-model.bytecodealliance.org/")); |
| 45 | + // Let's print some stats to the server console |
| 46 | + println!( |
| 47 | + "HEAD request done: {:?}, took {:?} ms, content-length: {:?}", |
| 48 | + first_response.url, first_response.ms_taken, first_response.content_length |
| 49 | + ); |
13 | 50 |
|
14 | | - let (first, len) = match futures::future::select(spin, book).await { |
15 | | - Either::Left(len) => ("Spin docs", len), |
16 | | - Either::Right(len) => ("Component model book", len), |
17 | | - }; |
| 51 | + // Serializing the struct as JSON represented as bytes |
| 52 | + let bytes: Bytes = serde_json::to_vec_pretty(&first_response) |
| 53 | + .expect("Failed to serialize!") |
| 54 | + .try_into() |
| 55 | + .unwrap(); |
18 | 56 |
|
19 | | - let response = format!("{first} site was first response with content-length {:?}\n", len.0?); |
| 57 | + // Sends the bytes over the channel and closes te channel, as we're done with the client response |
| 58 | + tx.send(bytes).await.unwrap(); |
| 59 | + tx.close_channel(); |
20 | 60 |
|
21 | | - Ok(response) |
| 61 | + // Handles the secons request (future) as it returns a reponse |
| 62 | + let second_response = first_result.1.await.expect("Failed to get second response"); |
| 63 | + |
| 64 | + // And printing stats to the server console, as the client connection is already closed |
| 65 | + println!( |
| 66 | + "HEAD request done: {:?}, took {:?} ms, content-length: {:?}", |
| 67 | + second_response.url, second_response.ms_taken, second_response.content_length |
| 68 | + ); |
| 69 | + |
| 70 | + println!("Done"); |
| 71 | + }); |
| 72 | + |
| 73 | + // Returning the body, once the channel closes |
| 74 | + http::Response::new(body) |
22 | 75 | } |
23 | 76 |
|
24 | | -async fn get_content_length(url: &str) -> anyhow::Result<Option<u64>> { |
25 | | - let request = Request::get(url).body(EmptyBody::new())?; |
| 77 | +// Getting the content length via an HTTP HEAD request |
| 78 | +async fn get_content_length(url: &str) -> anyhow::Result<ResponseStats> { |
| 79 | + let request = Request::head(url).body(EmptyBody::new())?; |
| 80 | + let start = std::time::SystemTime::now(); |
| 81 | + println!("HEAD request sent: {url}"); |
26 | 82 | let response = send(request).await?; |
27 | 83 | let cl_header = response.headers().get("content-length"); |
28 | | - let cl = cl_header.and_then(|hval| hval.to_str().ok()).and_then(|hval| hval.parse().ok()); |
29 | | - Ok(cl) |
| 84 | + let cl = cl_header |
| 85 | + .and_then(|hval| hval.to_str().ok()) |
| 86 | + .and_then(|hval| hval.parse().ok()); |
| 87 | + let end = std::time::SystemTime::now() |
| 88 | + .duration_since(start) |
| 89 | + .expect("Failed to get time"); |
| 90 | + Ok(ResponseStats { |
| 91 | + content_length: cl.unwrap(), |
| 92 | + ms_taken: end.as_millis(), |
| 93 | + url: url.to_string(), |
| 94 | + }) |
| 95 | +} |
| 96 | + |
| 97 | +#[derive(Debug, Serialize, Deserialize, Clone)] |
| 98 | +struct ResponseStats { |
| 99 | + content_length: u64, |
| 100 | + ms_taken: u128, |
| 101 | + url: String, |
| 102 | +} |
| 103 | + |
| 104 | +// Helper function to create a streaming body. |
| 105 | +fn bytes_stream_body() -> ( |
| 106 | + Sender<bytes::Bytes>, |
| 107 | + impl http_body::Body<Data = Bytes, Error = anyhow::Error>, |
| 108 | +) { |
| 109 | + // The send and receive sides of a channel |
| 110 | + let (tx, rx) = channel::<Bytes>(1024); |
| 111 | + // The receive side is a stream, so we can use combinators like `map` |
| 112 | + // to transform it into a form that the response plumbing is happy |
| 113 | + // with. The app logic that writes to the stream doesn't need to see |
| 114 | + // any of this. |
| 115 | + let stm = rx.map(|value| Ok(http_body::Frame::data(value))); |
| 116 | + // Construct a Body implementation over the stream. |
| 117 | + let body = http_body_util::StreamBody::new(stm); |
| 118 | + // Return the send side (so that app logic can write to the body) and the |
| 119 | + // body (so it can be put in a Response!). |
| 120 | + (tx, body) |
30 | 121 | } |
0 commit comments