Skip to content

Commit 92a80b4

Browse files
authored
Expose streaming API (#1013)
* Expose streaming API
1 parent 42e2c1f commit 92a80b4

File tree

8 files changed

+422
-36
lines changed

8 files changed

+422
-36
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "http-axum-streaming-otel"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
axum = "0.8"
8+
bytes = "1"
9+
lambda_http = { path = "../../lambda-http", default-features = false, features = [
10+
"apigw_http", "tracing", "opentelemetry"
11+
] }
12+
opentelemetry = "0.30"
13+
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
14+
opentelemetry-stdout = { version = "0.30", features = ["trace"] }
15+
thiserror = "2.0"
16+
tokio = { version = "1", features = ["macros"] }
17+
tokio-stream = "0.1.2"
18+
tracing = "0.1"
19+
tracing-opentelemetry = "0.31"
20+
tracing-subscriber = "0.3"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# AWS Lambda Function example
2+
3+
This example shows how to build a **streaming HTTP response** with `Axum` and
4+
run it on AWS Lambda using a custom runtime with OpenTelemetry (OTel) support.
5+
6+
Tracing data is exported as console log entries visible in CloudWatch. Note that
7+
CloudWatch assigns a `Timestamp` to each log entry based on when it receives the
8+
data (batch exported). To see when work actually occurred, look at the span's
9+
event attributes, which include the precise local timestamps of those events.
10+
11+
## Build & Deploy
12+
13+
1. Install
14+
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
15+
2. Build the function with `cargo lambda build --release`
16+
3. Deploy the function to AWS Lambda with:
17+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words
18+
4. Enable Lambda streaming response on Lambda console: change the function url's
19+
invoke mode to `RESPONSE_STREAM`
20+
5. Verify the function works: `curl -N <function-url>`. The results should be
21+
streamed back with 0.5 second pause between each word.
22+
23+
## Build for ARM 64
24+
25+
Build the function with `cargo lambda build --release --arm64`
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//! # Example: Axum Streaming Responses on AWS Lambda with OTel
2+
//!
3+
//! Demonstrates serving **incremental streaming responses** from Axum handlers
4+
//! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with
5+
//! OpenTelemetry (OTel) support.
6+
//!
7+
//! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum
8+
//! responses into streaming bodies delivered as data is produced (unlike the
9+
//! default `run_with_streaming_response` helper).
10+
11+
use axum::{
12+
body::Body,
13+
http::{
14+
self,
15+
header::{CACHE_CONTROL, CONTENT_TYPE},
16+
StatusCode,
17+
},
18+
response::{IntoResponse, Response},
19+
routing::get,
20+
Router,
21+
};
22+
use bytes::Bytes;
23+
use core::{convert::Infallible, time::Duration};
24+
use lambda_http::{
25+
lambda_runtime::{
26+
layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer as OtelLayer},
27+
tracing::Instrument,
28+
Runtime,
29+
},
30+
tracing, Error, StreamAdapter,
31+
};
32+
use opentelemetry::trace::TracerProvider;
33+
use opentelemetry_sdk::trace;
34+
use thiserror::Error;
35+
use tokio::sync::mpsc;
36+
use tokio_stream::wrappers::ReceiverStream;
37+
use tracing_subscriber::prelude::*;
38+
39+
#[derive(Debug, Error)]
40+
pub enum AppError {
41+
#[error("{0}")]
42+
Http(#[from] http::Error),
43+
}
44+
45+
impl IntoResponse for AppError {
46+
fn into_response(self) -> Response {
47+
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
48+
}
49+
}
50+
51+
#[tracing::instrument(skip_all)]
52+
async fn stream_words() -> Result<Response, AppError> {
53+
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
54+
let body = Body::from_stream(ReceiverStream::new(rx));
55+
56+
tokio::spawn(
57+
async move {
58+
for (idx, msg) in ["Hello", "world", "from", "Lambda!"].iter().enumerate() {
59+
tokio::time::sleep(Duration::from_millis(500)).await;
60+
let line = format!("{msg}\n");
61+
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
62+
if tx.send(Ok(Bytes::from(line))).await.is_err() {
63+
break;
64+
}
65+
}
66+
}
67+
.instrument(tracing::info_span!("producer.stream_words")),
68+
);
69+
70+
Ok(Response::builder()
71+
.status(StatusCode::OK)
72+
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
73+
.header(CACHE_CONTROL, "no-cache")
74+
.body(body)?)
75+
}
76+
77+
#[tokio::main]
78+
async fn main() -> Result<(), Error> {
79+
// Set up OpenTelemetry tracer provider that writes spans to stdout for
80+
// debugging purposes
81+
let exporter = opentelemetry_stdout::SpanExporter::default();
82+
let tracer_provider = trace::SdkTracerProvider::builder()
83+
.with_batch_exporter(exporter)
84+
.build();
85+
86+
// Set up link between OpenTelemetry and tracing crate
87+
tracing_subscriber::registry()
88+
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
89+
tracer_provider.tracer("my-streaming-app"),
90+
))
91+
.init();
92+
93+
let svc = Router::new().route("/", get(stream_words));
94+
95+
// Initialize the Lambda runtime and add OpenTelemetry tracing
96+
let runtime = Runtime::new(StreamAdapter::from(svc)).layer(
97+
OtelLayer::new(|| {
98+
if let Err(err) = tracer_provider.force_flush() {
99+
eprintln!("Error flushing traces: {err:#?}");
100+
}
101+
})
102+
.with_trigger(OpenTelemetryFaasTrigger::Http),
103+
);
104+
105+
runtime.run().await
106+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[package]
2+
name = "http-axum-streaming"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
axum = "0.8"
8+
bytes = "1"
9+
lambda_http = { path = "../../lambda-http", default-features = false, features = [
10+
"apigw_http", "tracing"
11+
] }
12+
thiserror = "2.0"
13+
tokio = { version = "1", features = ["macros"] }
14+
tokio-stream = "0.1.2"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# AWS Lambda Function example
2+
3+
This example demonstrates building a **streaming** HTTP response with Axum,
4+
deployed on AWS Lambda using a custom runtime.
5+
6+
## Build & Deploy
7+
8+
1. Install
9+
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
10+
2. Build the function with `cargo lambda build --release`
11+
3. Deploy the function to AWS Lambda with:
12+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words
13+
4. Enable Lambda streaming response on Lambda console: change the function url's
14+
invoke mode to `RESPONSE_STREAM`
15+
5. Verify the function works: `curl -N <function-url>`. The results should be
16+
streamed back with 0.5 second pause between each word.
17+
18+
## Build for ARM 64
19+
20+
Build the function with `cargo lambda build --release --arm64`
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//! # Example: Axum Streaming Responses on AWS Lambda
2+
//!
3+
//! Demonstrates serving **incremental streaming responses** from Axum handlers
4+
//! running in AWS Lambda.
5+
//!
6+
//! - Runs with `run_with_streaming_response`, which uses the **default Lambda
7+
//! runtime** to convert Axum responses into streaming bodies delivered as
8+
//! data is produced (unlike the OTel example, which used a custom `Runtime` +
9+
//! `StreamAdapter`).
10+
11+
use axum::{
12+
body::Body,
13+
http::{
14+
self,
15+
header::{CACHE_CONTROL, CONTENT_TYPE},
16+
StatusCode,
17+
},
18+
response::{IntoResponse, Response},
19+
routing::get,
20+
Router,
21+
};
22+
use bytes::Bytes;
23+
use core::{convert::Infallible, time::Duration};
24+
use lambda_http::{run_with_streaming_response, tracing, Error};
25+
use thiserror::Error;
26+
use tokio::sync::mpsc;
27+
use tokio_stream::wrappers::ReceiverStream;
28+
29+
#[derive(Debug, Error)]
30+
pub enum AppError {
31+
#[error("{0}")]
32+
Http(#[from] http::Error),
33+
}
34+
35+
impl IntoResponse for AppError {
36+
fn into_response(self) -> Response {
37+
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
38+
}
39+
}
40+
41+
async fn stream_words() -> Result<Response, AppError> {
42+
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
43+
let body = Body::from_stream(ReceiverStream::new(rx));
44+
45+
tokio::spawn(async move {
46+
for msg in ["Hello", "world", "from", "Lambda!"] {
47+
tokio::time::sleep(Duration::from_millis(500)).await;
48+
if tx.send(Ok(Bytes::from(format!("{msg}\n")))).await.is_err() {
49+
break;
50+
}
51+
}
52+
});
53+
54+
Ok(Response::builder()
55+
.status(StatusCode::OK)
56+
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
57+
.header(CACHE_CONTROL, "no-cache")
58+
.body(body)?)
59+
}
60+
61+
#[tokio::main]
62+
async fn main() -> Result<(), Error> {
63+
tracing::init_default_subscriber();
64+
65+
let svc = Router::new().route("/", get(stream_words));
66+
67+
// Automatically convert the service into a streaming response with a
68+
// default runtime.
69+
run_with_streaming_response(svc).await
70+
}

lambda-http/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use std::{
102102
};
103103

104104
mod streaming;
105-
pub use streaming::run_with_streaming_response;
105+
pub use streaming::{run_with_streaming_response, StreamAdapter};
106106

107107
/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
108108
pub type Request = http::Request<Body>;

0 commit comments

Comments
 (0)