Skip to content

Commit b39ab05

Browse files
committed
update
1 parent 1ce5c3e commit b39ab05

File tree

3 files changed

+16
-78
lines changed

3 files changed

+16
-78
lines changed

examples/http-axum-streaming-otel/src/main.rs

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
1414
use axum::{
1515
body::Body,
16-
extract::Request,
1716
http::{
1817
self,
1918
header::{CACHE_CONTROL, CONTENT_TYPE},
@@ -31,7 +30,6 @@ use lambda_http::{
3130
tracing::Instrument,
3231
Runtime,
3332
},
34-
tower::util::BoxService,
3533
tracing, Error, StreamAdapter,
3634
};
3735
use opentelemetry::trace::TracerProvider;
@@ -53,32 +51,6 @@ impl IntoResponse for AppError {
5351
}
5452
}
5553

56-
#[tracing::instrument(skip_all)]
57-
async fn stream_numbers() -> Result<Response, AppError> {
58-
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
59-
let body = Body::from_stream(ReceiverStream::new(rx));
60-
61-
tokio::spawn(
62-
async move {
63-
for (idx, i) in (1..=4).enumerate() {
64-
tokio::time::sleep(Duration::from_millis(500)).await;
65-
let line = format!("number: {i}\n");
66-
tracing::info!(chunk.idx = idx, bytes = line.len(), "emit");
67-
if tx.send(Ok(Bytes::from(line))).await.is_err() {
68-
break;
69-
}
70-
}
71-
}
72-
.instrument(tracing::info_span!("producer.stream_numbers")),
73-
);
74-
75-
Ok(Response::builder()
76-
.status(StatusCode::OK)
77-
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
78-
.header(CACHE_CONTROL, "no-cache")
79-
.body(body)?)
80-
}
81-
8254
#[tracing::instrument(skip_all)]
8355
async fn stream_words() -> Result<Response, AppError> {
8456
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
@@ -105,14 +77,6 @@ async fn stream_words() -> Result<Response, AppError> {
10577
.body(body)?)
10678
}
10779

108-
fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
109-
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
110-
BoxService::new(Router::new().route("/", get(stream_numbers)))
111-
} else {
112-
BoxService::new(Router::new().route("/", get(stream_words)))
113-
}
114-
}
115-
11680
#[tokio::main]
11781
async fn main() -> Result<(), Error> {
11882
// Set up OpenTelemetry tracer provider that writes spans to stdout for
@@ -129,7 +93,7 @@ async fn main() -> Result<(), Error> {
12993
))
13094
.init();
13195

132-
let svc = create_svc();
96+
let svc = Router::new().route("/", get(stream_words));
13397

13498
// Initialize the Lambda runtime and add OpenTelemetry tracing
13599
let runtime = Runtime::new(StreamAdapter::from(svc)).layer(

examples/http-axum-streaming/src/main.rs

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
1414
use axum::{
1515
body::Body,
16-
extract::Request,
1716
http::{
1817
self,
1918
header::{CACHE_CONTROL, CONTENT_TYPE},
@@ -25,7 +24,7 @@ use axum::{
2524
};
2625
use bytes::Bytes;
2726
use core::{convert::Infallible, time::Duration};
28-
use lambda_http::{run_with_streaming_response, tower::util::BoxService, tracing, Error};
27+
use lambda_http::{run_with_streaming_response, tracing, Error};
2928
use thiserror::Error;
3029
use tokio::sync::mpsc;
3130
use tokio_stream::wrappers::ReceiverStream;
@@ -42,24 +41,6 @@ impl IntoResponse for AppError {
4241
}
4342
}
4443

45-
async fn stream_numbers() -> Result<Response, AppError> {
46-
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
47-
let body = Body::from_stream(ReceiverStream::new(rx));
48-
49-
tokio::spawn(async move {
50-
for i in 1..=4 {
51-
tokio::time::sleep(Duration::from_millis(500)).await;
52-
let _ = tx.send(Ok(Bytes::from(format!("number: {i}\n")))).await;
53-
}
54-
});
55-
56-
Ok(Response::builder()
57-
.status(StatusCode::OK)
58-
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
59-
.header(CACHE_CONTROL, "no-cache")
60-
.body(body)?)
61-
}
62-
6344
async fn stream_words() -> Result<Response, AppError> {
6445
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
6546
let body = Body::from_stream(ReceiverStream::new(rx));
@@ -80,19 +61,11 @@ async fn stream_words() -> Result<Response, AppError> {
8061
.body(body)?)
8162
}
8263

83-
fn create_svc() -> BoxService<Request<lambda_http::Body>, Response<Body>, Infallible> {
84-
if std::env::var("USE_NUMBERS").as_deref() == Ok("1") {
85-
BoxService::new(Router::new().route("/", get(stream_numbers)))
86-
} else {
87-
BoxService::new(Router::new().route("/", get(stream_words)))
88-
}
89-
}
90-
9164
#[tokio::main]
9265
async fn main() -> Result<(), Error> {
9366
tracing::init_default_subscriber();
9467

95-
let svc = create_svc();
68+
let svc = Router::new().route("/", get(stream_words));
9669

9770
// Automatically convert the service into a streaming response with a
9871
// default runtime.

lambda-http/src/streaming.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -198,26 +198,27 @@ mod test_stream_adapter {
198198
}
199199

200200
fn call(&mut self, event: LambdaEvent<LambdaRequest>) -> Self::Future {
201-
// Log the request
202201
println!("Lambda event: {event:#?}");
203-
204202
self.inner.call(event)
205203
}
206204
}
207205

208-
/// This tests that `StreamAdapter` can be used in a `tower::Service` where
209-
/// the user may require additional middleware between `lambda_runtime::run`
210-
/// and where the `LambdaEvent` is converted into a `Request`.
206+
/// Works with a concrete service stack (no boxing)
211207
#[test]
212-
fn stream_adapter_is_boxable() {
208+
fn stream_adapter_with_concrete_stack() {
213209
let _svc = ServiceBuilder::new()
214-
.layer_fn(|service| {
215-
// This could be any middleware that logs, inspects, or
216-
// manipulates the `LambdaEvent` before it's converted to a
217-
// `Request` by `Adapter`.
210+
.layer_fn(|service| LogService { inner: service })
211+
.layer_fn(StreamAdapter::from)
212+
.service_fn(
213+
|_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) },
214+
);
215+
}
218216

219-
LogService { inner: service }
220-
})
217+
/// Also works when the stack is boxed (type-erased)
218+
#[test]
219+
fn stream_adapter_with_boxed_stack() {
220+
let _svc = ServiceBuilder::new()
221+
.layer_fn(|service| LogService { inner: service })
221222
.layer_fn(StreamAdapter::from)
222223
.service_fn(
223224
|_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) },

0 commit comments

Comments
 (0)