Skip to content

Commit ff39706

Browse files
committed
Update examples
1 parent 5c23616 commit ff39706

File tree

7 files changed

+258
-69
lines changed

7 files changed

+258
-69
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "http-axum-streaming-otel"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
axum = "0.8"
8+
bytes = "1"
9+
lambda_http = { path = "../../lambda-http", default-features = false, features = [
10+
"apigw_http", "tracing", "opentelemetry"
11+
] }
12+
opentelemetry = "0.30"
13+
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
14+
opentelemetry-stdout = { version = "0.30", features = ["trace"] }
15+
thiserror = "2.0"
16+
tokio = { version = "1", features = ["macros"] }
17+
tokio-stream = "0.1.2"
18+
tracing = "0.1"
19+
tracing-opentelemetry = "0.31"
20+
tracing-subscriber = "0.3"
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# AWS Lambda Function example
2+
3+
This example shows how to build a **streaming HTTP response** with `Axum` and
4+
run it on AWS Lambda using a custom runtime with OpenTelemetry (OTel) support.
5+
6+
Tracing data is exported as console log entries visible in CloudWatch. Note that
7+
CloudWatch assigns a `Timestamp` to each log entry based on when it receives the
8+
data (batch exported). To see when work actually occurred, look at the span's
9+
event attributes, which include the precise local timestamps of those events.
10+
11+
## Build & Deploy
12+
13+
1. Install
14+
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
15+
2. Build the function with `cargo lambda build --release`
16+
3. Deploy the function to AWS Lambda with:
17+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
18+
USE_NUMBERS=0` to stream words
19+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
20+
USE_NUMBERS=1` to stream numbers.
21+
4. Enable Lambda streaming response on Lambda console: change the function url's
22+
invoke mode to `RESPONSE_STREAM`
23+
5. Verify the function works: `curl -N <function-url>`. The results should be
24+
streamed back with 0.5 second pause between each word.
25+
26+
## Build for ARM 64
27+
28+
Build the function with `cargo lambda build --release --arm64`
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//! # Example: Axum Streaming Responses on AWS Lambda with OTel
2+
//!
3+
//! Demonstrates serving **incremental streaming responses** from Axum handlers
4+
//! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with
5+
//! OpenTelemetry (OTel) support.
6+
//!
7+
//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words.
8+
//! - Uses `BoxService` to erase the router's concrete type so different routers
9+
//! can be selected at runtime.
10+
//! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum
11+
//! responses into streaming bodies delivered as data is produced (unlike the
12+
//! default `run_with_streaming_response` helper).
13+
14+
use axum::{
15+
body::Body,
16+
extract::Request,
17+
http::{
18+
self,
19+
header::{CACHE_CONTROL, CONTENT_TYPE},
20+
StatusCode,
21+
},
22+
response::{IntoResponse, Response},
23+
routing::get,
24+
Router,
25+
};
26+
use bytes::Bytes;
27+
use core::{convert::Infallible, time::Duration};
28+
use lambda_http::{
29+
lambda_runtime::{
30+
layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer as OtelLayer},
31+
tracing::Instrument,
32+
Runtime,
33+
},
34+
tower::util::BoxService,
35+
tracing, Error, StreamAdapter,
36+
};
37+
use opentelemetry::trace::TracerProvider;
38+
use opentelemetry_sdk::trace;
39+
use thiserror::Error;
40+
use tokio::sync::mpsc;
41+
use tokio_stream::wrappers::ReceiverStream;
42+
use tracing_subscriber::prelude::*;
43+
44+
#[derive(Debug, Error)]
45+
pub enum AppError {
46+
#[error("{0}")]
47+
Http(#[from] http::Error),
48+
}
49+
50+
impl IntoResponse for AppError {
51+
fn into_response(self) -> Response {
52+
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
53+
}
54+
}
55+
56+
#[tracing::instrument(skip_all)]
57+
async fn stream_numbers() -> Result<Response, AppError> {
58+
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
59+
let body = Body::from_stream(ReceiverStream::new(rx));
60+
61+
tokio::spawn(
62+
async move {
63+
for (idx, i) in (1..=4).enumerate() {
64+
tokio::time::sleep(Duration::from_millis(500)).await;
65+
let line = format!("number: {i}\n");
66+
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
67+
if tx.send(Ok(Bytes::from(line))).await.is_err() {
68+
break;
69+
}
70+
}
71+
}
72+
.instrument(tracing::info_span!("producer.stream_numbers")),
73+
);
74+
75+
Ok(Response::builder()
76+
.status(StatusCode::OK)
77+
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
78+
.header(CACHE_CONTROL, "no-cache")
79+
.body(body)?)
80+
}
81+
82+
#[tracing::instrument(skip_all)]
83+
async fn stream_words() -> Result<Response, AppError> {
84+
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
85+
let body = Body::from_stream(ReceiverStream::new(rx));
86+
87+
tokio::spawn(
88+
async move {
89+
for (idx, msg) in ["Hello", "world", "from", "Lambda!"].iter().enumerate() {
90+
tokio::time::sleep(Duration::from_millis(500)).await;
91+
let line = format!("{msg}\n");
92+
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
93+
if tx.send(Ok(Bytes::from(line))).await.is_err() {
94+
break;
95+
}
96+
}
97+
}
98+
.instrument(tracing::info_span!("producer.stream_words")),
99+
);
100+
101+
Ok(Response::builder()
102+
.status(StatusCode::OK)
103+
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
104+
.header(CACHE_CONTROL, "no-cache")
105+
.body(body)?)
106+
}
107+
108+
// Creates a dynamic router based on the environment variable. Demonstrating how
109+
// you can type-erase a service
110+
fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
111+
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
112+
BoxService::new(Router::new().route("/", get(stream_numbers)))
113+
} else {
114+
BoxService::new(Router::new().route("/", get(stream_words)))
115+
}
116+
}
117+
118+
#[tokio::main]
119+
async fn main() -> Result<(), Error> {
120+
// Set up OpenTelemetry tracer provider that writes spans to stdout for
121+
// debugging purposes
122+
let exporter = opentelemetry_stdout::SpanExporter::default();
123+
let tracer_provider = trace::SdkTracerProvider::builder()
124+
.with_batch_exporter(exporter)
125+
.build();
126+
127+
// Set up link between OpenTelemetry and tracing crate
128+
tracing_subscriber::registry()
129+
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
130+
tracer_provider.tracer("my-streaming-app"),
131+
))
132+
.init();
133+
134+
let svc = create_svc();
135+
136+
// Initialize the Lambda runtime and add OpenTelemetry tracing
137+
let runtime = Runtime::new(StreamAdapter::from(svc)).layer(
138+
OtelLayer::new(|| {
139+
if let Err(err) = tracer_provider.force_flush() {
140+
eprintln!("Error flushing traces: {err:#?}");
141+
}
142+
})
143+
.with_trigger(OpenTelemetryFaasTrigger::Http),
144+
);
145+
146+
runtime.run().await
147+
}

examples/http-axum-streaming/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ edition = "2021"
66
[dependencies]
77
axum = "0.8"
88
bytes = "1"
9-
futures-util = "0.3"
109
lambda_http = { path = "../../lambda-http", default-features = false, features = [
11-
"apigw_rest", "apigw_http", "tracing"
10+
"apigw_http", "tracing"
1211
] }
1312
thiserror = "2.0"
1413
tokio = { version = "1", features = ["macros"] }

examples/http-axum-streaming/README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
# AWS Lambda Function example
22

3-
This example demonstrates building a **streaming** HTTP response with Axum, deployed on AWS Lambda using a custom runtime.
3+
This example demonstrates building a **streaming** HTTP response with Axum,
4+
deployed on AWS Lambda using a custom runtime.
45

56
## Build & Deploy
67

7-
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
8+
1. Install
9+
[cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
810
2. Build the function with `cargo lambda build --release`
9-
3. Deploy the function to AWS Lambda with `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE`
10-
4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM`
11-
5. Verify the function works: `curl -v -N <function-url>`. The results should be streamed back with 0.5 second pause between each word.
11+
3. Deploy the function to AWS Lambda with:
12+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
13+
USE_NUMBERS=0` to stream words
14+
- `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var
15+
USE_NUMBERS=1` to stream numbers.
16+
4. Enable Lambda streaming response on Lambda console: change the function url's
17+
invoke mode to `RESPONSE_STREAM`
18+
5. Verify the function works: `curl -N <function-url>`. The results should be
19+
streamed back with 0.5 second pause between each word.
1220

1321
## Build for ARM 64
1422

examples/http-axum-streaming/src/main.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
1+
//! # Example: Axum Streaming Responses on AWS Lambda
2+
//!
3+
//! Demonstrates serving **incremental streaming responses** from Axum handlers
4+
//! running in AWS Lambda.
5+
//!
6+
//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words.
7+
//! - Uses `BoxService` to erase the router's concrete type so different routers
8+
//! can be selected at runtime.
9+
//! - Runs with `run_with_streaming_response`, which uses the **default Lambda
10+
//! runtime** to convert Axum responses into streaming bodies delivered as
11+
//! data is produced (unlike the OTel example, which used a custom `Runtime` +
12+
//! `StreamAdapter`).
13+
114
use axum::{
215
body::Body,
3-
http,
16+
extract::Request,
417
http::{
18+
self,
519
header::{CACHE_CONTROL, CONTENT_TYPE},
620
StatusCode,
721
},
@@ -10,8 +24,8 @@ use axum::{
1024
Router,
1125
};
1226
use bytes::Bytes;
13-
use lambda_http::{lambda_runtime, tracing, Error, StreamAdapter};
14-
use std::{convert::Infallible, time::Duration};
27+
use core::{convert::Infallible, time::Duration};
28+
use lambda_http::{run_with_streaming_response, tower::util::BoxService, tracing, Error};
1529
use thiserror::Error;
1630
use tokio::sync::mpsc;
1731
use tokio_stream::wrappers::ReceiverStream;
@@ -28,9 +42,25 @@ impl IntoResponse for AppError {
2842
}
2943
}
3044

31-
type AppResult<T = Response> = Result<T, AppError>;
45+
async fn stream_numbers() -> Result<Response, AppError> {
46+
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
47+
let body = Body::from_stream(ReceiverStream::new(rx));
48+
49+
tokio::spawn(async move {
50+
for i in 1..=4 {
51+
tokio::time::sleep(Duration::from_millis(500)).await;
52+
let _ = tx.send(Ok(Bytes::from(format!("number: {i}\n")))).await;
53+
}
54+
});
3255

33-
async fn stream_handler() -> AppResult {
56+
Ok(Response::builder()
57+
.status(StatusCode::OK)
58+
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
59+
.header(CACHE_CONTROL, "no-cache")
60+
.body(body)?)
61+
}
62+
63+
async fn stream_words() -> Result<Response, AppError> {
3464
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
3565
let body = Body::from_stream(ReceiverStream::new(rx));
3666

@@ -50,13 +80,23 @@ async fn stream_handler() -> AppResult {
5080
.body(body)?)
5181
}
5282

83+
// Creates a dynamic router based on the environment variable. Demonstrating how
84+
// you can type-erase a service
85+
fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
86+
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
87+
BoxService::new(Router::new().route("/", get(stream_numbers)))
88+
} else {
89+
BoxService::new(Router::new().route("/", get(stream_words)))
90+
}
91+
}
92+
5393
#[tokio::main]
5494
async fn main() -> Result<(), Error> {
5595
tracing::init_default_subscriber();
5696

57-
let app = Router::new().route("/", get(stream_handler));
58-
59-
let runtime = lambda_runtime::Runtime::new(StreamAdapter::from(app));
97+
let svc = create_svc();
6098

61-
runtime.run().await
99+
// Automatically convert the service into a streaming response with a
100+
// default runtime.
101+
run_with_streaming_response(svc).await
62102
}

lambda-http/src/streaming.rs

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -172,56 +172,3 @@ where
172172
}
173173
}
174174
}
175-
176-
#[cfg(test)]
177-
mod test_stream_adapter {
178-
use super::*;
179-
180-
use crate::Body;
181-
use http::StatusCode;
182-
183-
// A middleware that logs requests before forwarding them to another service
184-
struct LogService<S> {
185-
inner: S,
186-
}
187-
188-
impl<S> Service<LambdaEvent<LambdaRequest>> for LogService<S>
189-
where
190-
S: Service<LambdaEvent<LambdaRequest>>,
191-
{
192-
type Response = S::Response;
193-
type Error = S::Error;
194-
type Future = S::Future;
195-
196-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
197-
self.inner.poll_ready(cx)
198-
}
199-
200-
fn call(&mut self, event: LambdaEvent<LambdaRequest>) -> Self::Future {
201-
// Log the request
202-
println!("Lambda event: {event:#?}");
203-
204-
self.inner.call(event)
205-
}
206-
}
207-
208-
/// This tests that `StreamAdapter` can be used in a `tower::Service` where
209-
/// the user may require additional middleware between `lambda_runtime::run`
210-
/// and where the `LambdaEvent` is converted into a `Request`.
211-
#[test]
212-
fn stream_adapter_is_boxable() {
213-
let _svc = ServiceBuilder::new()
214-
.layer_fn(|service| {
215-
// This could be any middleware that logs, inspects, or
216-
// manipulates the `LambdaEvent` before it's converted to a
217-
// `Request` by `Adapter`.
218-
219-
LogService { inner: service }
220-
})
221-
.layer_fn(StreamAdapter::from)
222-
.service_fn(
223-
|_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) },
224-
)
225-
.boxed();
226-
}
227-
}

0 commit comments

Comments
 (0)