diff --git a/examples/http-axum-streaming-otel/Cargo.toml b/examples/http-axum-streaming-otel/Cargo.toml new file mode 100644 index 00000000..d917bb03 --- /dev/null +++ b/examples/http-axum-streaming-otel/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "http-axum-streaming-otel" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8" +bytes = "1" +lambda_http = { path = "../../lambda-http", default-features = false, features = [ + "apigw_http", "tracing", "opentelemetry" +] } +opentelemetry = "0.30" +opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] } +opentelemetry-stdout = { version = "0.30", features = ["trace"] } +thiserror = "2.0" +tokio = { version = "1", features = ["macros"] } +tokio-stream = "0.1.2" +tracing = "0.1" +tracing-opentelemetry = "0.31" +tracing-subscriber = "0.3" diff --git a/examples/http-axum-streaming-otel/README.md b/examples/http-axum-streaming-otel/README.md new file mode 100644 index 00000000..194fe4e4 --- /dev/null +++ b/examples/http-axum-streaming-otel/README.md @@ -0,0 +1,25 @@ +# AWS Lambda Function example + +This example shows how to build a **streaming HTTP response** with `Axum` and +run it on AWS Lambda using a custom runtime with OpenTelemetry (OTel) support. + +Tracing data is exported as console log entries visible in CloudWatch. Note that +CloudWatch assigns a `Timestamp` to each log entry based on when it receives the +data (batch exported). To see when work actually occurred, look at the span's +event attributes, which include the precise local timestamps of those events. + +## Build & Deploy + +1. Install + [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the function with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with: + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words +4. Enable Lambda streaming response on Lambda console: change the function url's + invoke mode to `RESPONSE_STREAM` +5. Verify the function works: `curl -N `. The results should be + streamed back with 0.5 second pause between each word. + +## Build for ARM 64 + +Build the function with `cargo lambda build --release --arm64` diff --git a/examples/http-axum-streaming-otel/src/main.rs b/examples/http-axum-streaming-otel/src/main.rs new file mode 100644 index 00000000..64f4e49e --- /dev/null +++ b/examples/http-axum-streaming-otel/src/main.rs @@ -0,0 +1,106 @@ +//! # Example: Axum Streaming Responses on AWS Lambda with OTel +//! +//! Demonstrates serving **incremental streaming responses** from Axum handlers +//! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with +//! OpenTelemetry (OTel) support. +//! +//! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum +//! responses into streaming bodies delivered as data is produced (unlike the +//! default `run_with_streaming_response` helper). + +use axum::{ + body::Body, + http::{ + self, + header::{CACHE_CONTROL, CONTENT_TYPE}, + StatusCode, + }, + response::{IntoResponse, Response}, + routing::get, + Router, +}; +use bytes::Bytes; +use core::{convert::Infallible, time::Duration}; +use lambda_http::{ + lambda_runtime::{ + layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer as OtelLayer}, + tracing::Instrument, + Runtime, + }, + tracing, Error, StreamAdapter, +}; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::trace; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing_subscriber::prelude::*; + +#[derive(Debug, Error)] +pub enum AppError { + #[error("{0}")] + Http(#[from] http::Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response() + } +} + +#[tracing::instrument(skip_all)] +async fn stream_words() -> Result { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn( + async move { + for (idx, msg) in ["Hello", "world", "from", "Lambda!"].iter().enumerate() { + tokio::time::sleep(Duration::from_millis(500)).await; + let line = format!("{msg}\n"); + tracing::info!(chunk.idx = idx, bytes = line.len(), "emit"); + if tx.send(Ok(Bytes::from(line))).await.is_err() { + break; + } + } + } + .instrument(tracing::info_span!("producer.stream_words")), + ); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // Set up OpenTelemetry tracer provider that writes spans to stdout for + // debugging purposes + let exporter = opentelemetry_stdout::SpanExporter::default(); + let tracer_provider = trace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .build(); + + // Set up link between OpenTelemetry and tracing crate + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new( + tracer_provider.tracer("my-streaming-app"), + )) + .init(); + + let svc = Router::new().route("/", get(stream_words)); + + // Initialize the Lambda runtime and add OpenTelemetry tracing + let runtime = Runtime::new(StreamAdapter::from(svc)).layer( + OtelLayer::new(|| { + if let Err(err) = tracer_provider.force_flush() { + eprintln!("Error flushing traces: {err:#?}"); + } + }) + .with_trigger(OpenTelemetryFaasTrigger::Http), + ); + + runtime.run().await +} diff --git a/examples/http-axum-streaming/Cargo.toml b/examples/http-axum-streaming/Cargo.toml new file mode 100644 index 00000000..a951562b --- /dev/null +++ b/examples/http-axum-streaming/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "http-axum-streaming" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8" +bytes = "1" +lambda_http = { path = "../../lambda-http", default-features = false, features = [ + "apigw_http", "tracing" +] } +thiserror = "2.0" +tokio = { version = "1", features = ["macros"] } +tokio-stream = "0.1.2" diff --git a/examples/http-axum-streaming/README.md b/examples/http-axum-streaming/README.md new file mode 100644 index 00000000..fe7e573d --- /dev/null +++ b/examples/http-axum-streaming/README.md @@ -0,0 +1,20 @@ +# AWS Lambda Function example + +This example demonstrates building a **streaming** HTTP response with Axum, +deployed on AWS Lambda using a custom runtime. + +## Build & Deploy + +1. Install + [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the function with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with: + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words +4. Enable Lambda streaming response on Lambda console: change the function url's + invoke mode to `RESPONSE_STREAM` +5. Verify the function works: `curl -N `. The results should be + streamed back with 0.5 second pause between each word. + +## Build for ARM 64 + +Build the function with `cargo lambda build --release --arm64` diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs new file mode 100644 index 00000000..1812f879 --- /dev/null +++ b/examples/http-axum-streaming/src/main.rs @@ -0,0 +1,70 @@ +//! # Example: Axum Streaming Responses on AWS Lambda +//! +//! Demonstrates serving **incremental streaming responses** from Axum handlers +//! running in AWS Lambda. +//! +//! - Runs with `run_with_streaming_response`, which uses the **default Lambda +//! runtime** to convert Axum responses into streaming bodies delivered as +//! data is produced (unlike the OTel example, which used a custom `Runtime` + +//! `StreamAdapter`). + +use axum::{ + body::Body, + http::{ + self, + header::{CACHE_CONTROL, CONTENT_TYPE}, + StatusCode, + }, + response::{IntoResponse, Response}, + routing::get, + Router, +}; +use bytes::Bytes; +use core::{convert::Infallible, time::Duration}; +use lambda_http::{run_with_streaming_response, tracing, Error}; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +#[derive(Debug, Error)] +pub enum AppError { + #[error("{0}")] + Http(#[from] http::Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response() + } +} + +async fn stream_words() -> Result { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn(async move { + for msg in ["Hello", "world", "from", "Lambda!"] { + tokio::time::sleep(Duration::from_millis(500)).await; + if tx.send(Ok(Bytes::from(format!("{msg}\n")))).await.is_err() { + break; + } + } + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing::init_default_subscriber(); + + let svc = Router::new().route("/", get(stream_words)); + + // Automatically convert the service into a streaming response with a + // default runtime. + run_with_streaming_response(svc).await +} diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 33ccea12..36e2ffbd 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -102,7 +102,7 @@ use std::{ }; mod streaming; -pub use streaming::run_with_streaming_response; +pub use streaming::{run_with_streaming_response, StreamAdapter}; /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index a93408b4..6dd17230 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -1,22 +1,88 @@ -use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt}; +use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt}; use bytes::Bytes; -pub use http::{self, Response}; -use http_body::Body; -use lambda_runtime::Diagnostic; -pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; -use std::{ +use core::{ fmt::Debug, pin::Pin, task::{Context, Poll}, }; -use tokio_stream::Stream; +use futures_util::{Stream, TryFutureExt}; +pub use http::{self, Response}; +use http_body::Body; +use lambda_runtime::{ + tower::{ + util::{MapRequest, MapResponse}, + ServiceBuilder, ServiceExt, + }, + Diagnostic, +}; +pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; +use std::{future::Future, marker::PhantomData}; + +/// An adapter that lifts a standard [`Service`] into a +/// [`Service>`] which produces streaming Lambda HTTP +/// responses. +pub struct StreamAdapter<'a, S, B> { + service: S, + _phantom_data: PhantomData<&'a B>, +} + +impl<'a, S, B, E> From for StreamAdapter<'a, S, B> +where + S: Service, Error = E>, + S::Future: Send + 'a, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + fn from(service: S) -> Self { + StreamAdapter { + service, + _phantom_data: PhantomData, + } + } +} + +impl<'a, S, B, E> Service> for StreamAdapter<'a, S, B> +where + S: Service, Error = E>, + S::Future: Send + 'a, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + type Response = StreamResponse>; + type Error = E; + type Future = Pin> + Send + 'a>>; -/// Starts the Lambda Rust runtime and stream response back [Configure Lambda -/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html). + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: LambdaEvent) -> Self::Future { + let event: Request = req.payload.into(); + Box::pin( + self.service + .call(event.with_lambda_context(req.context)) + .map_ok(into_stream_response), + ) + } +} + +/// Builds a streaming-aware Tower service from a `Service` **without** +/// boxing its future (no heap allocation / vtable). /// -/// This takes care of transforming the LambdaEvent into a [`Request`] and -/// accepts [`http::Response`] as response. -pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> +/// Transforms `LambdaEvent` into `Request` with Lambda context +/// and wraps `Response` into `StreamResponse>`. +/// +/// Used internally by [`run_with_streaming_response`]; not part of the public +/// API. +#[allow(clippy::type_complexity)] +fn into_stream_service<'a, S, B, E>( + handler: S, +) -> MapResponse< + MapRequest) -> Request>, + impl FnOnce(Response) -> StreamResponse> + Clone, +> where S: Service, Error = E>, S::Future: Send + 'a, @@ -25,38 +91,59 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - let svc = ServiceBuilder::new() + ServiceBuilder::new() .map_request(|req: LambdaEvent| { let event: Request = req.payload.into(); event.with_lambda_context(req.context) }) .service(handler) - .map_response(|res| { - let (parts, body) = res.into_parts(); - - let mut prelude_headers = parts.headers; - - let cookies = prelude_headers.get_all(SET_COOKIE); - let cookies = cookies - .iter() - .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) - .collect::>(); + .map_response(into_stream_response) +} - prelude_headers.remove(SET_COOKIE); +/// Converts an `http::Response` into a streaming Lambda response. +fn into_stream_response(res: Response) -> StreamResponse> +where + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + let (parts, body) = res.into_parts(); - let metadata_prelude = MetadataPrelude { - headers: prelude_headers, - status_code: parts.status, - cookies, - }; + let mut headers = parts.headers; + let cookies = headers + .get_all(SET_COOKIE) + .iter() + .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) + .collect::>(); + headers.remove(SET_COOKIE); - StreamResponse { - metadata_prelude, - stream: BodyStream { body }, - } - }); + StreamResponse { + metadata_prelude: MetadataPrelude { + headers, + status_code: parts.status, + cookies, + }, + stream: BodyStream { body }, + } +} - lambda_runtime::run(svc).await +/// Runs the Lambda runtime with a handler that returns **streaming** HTTP +/// responses. +/// +/// See the [AWS docs for response streaming]. +/// +/// [AWS docs for response streaming]: +/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html +pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> +where + S: Service, Error = E>, + S::Future: Send + 'a, + E: Debug + Into, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + lambda_runtime::run(into_stream_service(handler)).await } pin_project_lite::pin_project! { @@ -85,3 +172,47 @@ where } } } + +#[cfg(test)] +mod test_stream_adapter { + use super::*; + + use crate::Body; + use http::StatusCode; + + // A middleware that logs requests before forwarding them to another service + struct LogService { + inner: S, + } + + impl Service> for LogService + where + S: Service>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, event: LambdaEvent) -> Self::Future { + println!("Lambda event: {event:#?}"); + self.inner.call(event) + } + } + + #[test] + fn stream_adapter_is_boxable() { + // Works with a concrete service stack (no boxing) + let svc = ServiceBuilder::new() + .layer_fn(|service| LogService { inner: service }) + .layer_fn(StreamAdapter::from) + .service_fn( + |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, + ); + // Also works when the stack is boxed (type-erased) + let _boxed_svc = svc.boxed(); + } +}