Skip to content

Expose streaming API #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Aug 21, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions examples/http-axum-streaming-otel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "http-axum-streaming-otel"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.8"
bytes = "1"
lambda_http = { path = "../../lambda-http", default-features = false, features = [
"apigw_http", "tracing", "opentelemetry"
] }
opentelemetry = "0.30"
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
opentelemetry-stdout = { version = "0.30", features = ["trace"] }
thiserror = "2.0"
tokio = { version = "1", features = ["macros"] }
tokio-stream = "0.1.2"
tracing = "0.1"
tracing-opentelemetry = "0.31"
tracing-subscriber = "0.3"
28 changes: 28 additions & 0 deletions examples/http-axum-streaming-otel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# AWS Lambda Function example

This example shows how to build a **streaming HTTP response** with `Axum` and
run it on AWS Lambda using a custom runtime with OpenTelemetry (OTel) support.

Tracing data is exported as console log entries visible in CloudWatch. Note that
CloudWatch assigns a `Timestamp` to each log entry based on when it receives the
data (batch exported). To see when work actually occurred, look at the span's
event attributes, which include the precise local timestamps of those events.

## Build & Deploy

1. Install
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the function with `cargo lambda build --release`
3. Deploy the function to AWS Lambda with:
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
USE_NUMBERS=0` to stream words
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
USE_NUMBERS=1` to stream numbers.
4. Enable Lambda streaming response on Lambda console: change the function url's
invoke mode to `RESPONSE_STREAM`
5. Verify the function works: `curl -N <function-url>`. The results should be
streamed back with 0.5 second pause between each word.

## Build for ARM 64

Build the function with `cargo lambda build --release --arm64`
145 changes: 145 additions & 0 deletions examples/http-axum-streaming-otel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! # Example: Axum Streaming Responses on AWS Lambda with OTel
//!
//! Demonstrates serving **incremental streaming responses** from Axum handlers
//! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with
//! OpenTelemetry (OTel) support.
//!
//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words.
//! - Uses `BoxService` to erase the router's concrete type so different routers
//! can be selected at runtime.
//! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum
//! responses into streaming bodies delivered as data is produced (unlike the
//! default `run_with_streaming_response` helper).

use axum::{
body::Body,
extract::Request,
http::{
self,
header::{CACHE_CONTROL, CONTENT_TYPE},
StatusCode,
},
response::{IntoResponse, Response},
routing::get,
Router,
};
use bytes::Bytes;
use core::{convert::Infallible, time::Duration};
use lambda_http::{
lambda_runtime::{
layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer as OtelLayer},
tracing::Instrument,
Runtime,
},
tower::util::BoxService,
tracing, Error, StreamAdapter,
};
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::trace;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing_subscriber::prelude::*;

#[derive(Debug, Error)]
pub enum AppError {
#[error("{0}")]
Http(#[from] http::Error),
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
}
}

#[tracing::instrument(skip_all)]
async fn stream_numbers() -> Result<Response, AppError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
let body = Body::from_stream(ReceiverStream::new(rx));

tokio::spawn(
async move {
for (idx, i) in (1..=4).enumerate() {
tokio::time::sleep(Duration::from_millis(500)).await;
let line = format!("number: {i}\n");
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
if tx.send(Ok(Bytes::from(line))).await.is_err() {
break;
}
}
}
.instrument(tracing::info_span!("producer.stream_numbers")),
);

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CACHE_CONTROL, "no-cache")
.body(body)?)
}

#[tracing::instrument(skip_all)]
async fn stream_words() -> Result<Response, AppError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
let body = Body::from_stream(ReceiverStream::new(rx));

tokio::spawn(
async move {
for (idx, msg) in ["Hello", "world", "from", "Lambda!"].iter().enumerate() {
tokio::time::sleep(Duration::from_millis(500)).await;
let line = format!("{msg}\n");
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
if tx.send(Ok(Bytes::from(line))).await.is_err() {
break;
}
}
}
.instrument(tracing::info_span!("producer.stream_words")),
);

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CACHE_CONTROL, "no-cache")
.body(body)?)
}

fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
BoxService::new(Router::new().route("/", get(stream_numbers)))
} else {
BoxService::new(Router::new().route("/", get(stream_words)))
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// Set up OpenTelemetry tracer provider that writes spans to stdout for
// debugging purposes
let exporter = opentelemetry_stdout::SpanExporter::default();
let tracer_provider = trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();

// Set up link between OpenTelemetry and tracing crate
tracing_subscriber::registry()
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
tracer_provider.tracer("my-streaming-app"),
))
.init();

let svc = create_svc();

// Initialize the Lambda runtime and add OpenTelemetry tracing
let runtime = Runtime::new(StreamAdapter::from(svc)).layer(
OtelLayer::new(|| {
if let Err(err) = tracer_provider.force_flush() {
eprintln!("Error flushing traces: {err:#?}");
}
})
.with_trigger(OpenTelemetryFaasTrigger::Http),
);

runtime.run().await
}
14 changes: 14 additions & 0 deletions examples/http-axum-streaming/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "http-axum-streaming"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.8"
bytes = "1"
lambda_http = { path = "../../lambda-http", default-features = false, features = [
"apigw_http", "tracing"
] }
thiserror = "2.0"
tokio = { version = "1", features = ["macros"] }
tokio-stream = "0.1.2"
23 changes: 23 additions & 0 deletions examples/http-axum-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# AWS Lambda Function example

This example demonstrates building a **streaming** HTTP response with Axum,
deployed on AWS Lambda using a custom runtime.

## Build & Deploy

1. Install
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the function with `cargo lambda build --release`
3. Deploy the function to AWS Lambda with:
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
USE_NUMBERS=0` to stream words
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
USE_NUMBERS=1` to stream numbers.
4. Enable Lambda streaming response on Lambda console: change the function url's
invoke mode to `RESPONSE_STREAM`
5. Verify the function works: `curl -N <function-url>`. The results should be
streamed back with 0.5 second pause between each word.

## Build for ARM 64

Build the function with `cargo lambda build --release --arm64`
100 changes: 100 additions & 0 deletions examples/http-axum-streaming/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! # Example: Axum Streaming Responses on AWS Lambda
//!
//! Demonstrates serving **incremental streaming responses** from Axum handlers
//! running in AWS Lambda.
//!
//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words.
//! - Uses `BoxService` to erase the router's concrete type so different routers
//! can be selected at runtime.
//! - Runs with `run_with_streaming_response`, which uses the **default Lambda
//! runtime** to convert Axum responses into streaming bodies delivered as
//! data is produced (unlike the OTel example, which used a custom `Runtime` +
//! `StreamAdapter`).

use axum::{
body::Body,
extract::Request,
http::{
self,
header::{CACHE_CONTROL, CONTENT_TYPE},
StatusCode,
},
response::{IntoResponse, Response},
routing::get,
Router,
};
use bytes::Bytes;
use core::{convert::Infallible, time::Duration};
use lambda_http::{run_with_streaming_response, tower::util::BoxService, tracing, Error};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

#[derive(Debug, Error)]
pub enum AppError {
#[error("{0}")]
Http(#[from] http::Error),
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
}
}

async fn stream_numbers() -> Result<Response, AppError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
let body = Body::from_stream(ReceiverStream::new(rx));

tokio::spawn(async move {
for i in 1..=4 {
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = tx.send(Ok(Bytes::from(format!("number: {i}\n")))).await;
}
});

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CACHE_CONTROL, "no-cache")
.body(body)?)
}

async fn stream_words() -> Result<Response, AppError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
let body = Body::from_stream(ReceiverStream::new(rx));

tokio::spawn(async move {
for msg in ["Hello", "world", "from", "Lambda!"] {
tokio::time::sleep(Duration::from_millis(500)).await;
if tx.send(Ok(Bytes::from(format!("{msg}\n")))).await.is_err() {
break;
}
}
});

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CACHE_CONTROL, "no-cache")
.body(body)?)
}

fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
BoxService::new(Router::new().route("/", get(stream_numbers)))
} else {
BoxService::new(Router::new().route("/", get(stream_words)))
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();

let svc = create_svc();

// Automatically convert the service into a streaming response with a
// default runtime.
run_with_streaming_response(svc).await
}
2 changes: 1 addition & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::run_with_streaming_response;
pub use streaming::{run_with_streaming_response, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down
Loading