Skip to content

Commit 1cdeb92

Browse files
committed
wip
1 parent 035c0b0 commit 1cdeb92

File tree

1 file changed

+132
-31
lines changed

1 file changed

+132
-31
lines changed

lambda-http/src/streaming.rs

Lines changed: 132 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@ use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestEx
22
use bytes::Bytes;
33
use core::{
44
fmt::Debug,
5-
future::Future,
65
pin::Pin,
76
task::{Context, Poll},
87
};
9-
use futures_util::Stream;
8+
use futures_util::{Stream, TryFutureExt};
109
pub use http::{self, Response};
1110
use http_body::Body;
12-
use lambda_runtime::Diagnostic;
11+
use lambda_runtime::{
12+
tower::{
13+
util::{MapRequest, MapResponse},
14+
ServiceBuilder, ServiceExt,
15+
},
16+
Diagnostic,
17+
};
1318
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
14-
use std::marker::PhantomData;
19+
use std::{future::Future, marker::PhantomData};
1520

1621
/// An adapter that lifts a standard [`Service<Request>`] into a
1722
/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP
@@ -25,7 +30,6 @@ impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
2530
where
2631
S: Service<Request, Response = Response<B>, Error = E>,
2732
S::Future: Send + 'a,
28-
E: Debug + Into<Diagnostic>,
2933
B: Body + Unpin + Send + 'static,
3034
B::Data: Into<Bytes> + Send,
3135
B::Error: Into<Error> + Send + Debug,
@@ -42,46 +46,87 @@ impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B
4246
where
4347
S: Service<Request, Response = Response<B>, Error = E>,
4448
S::Future: Send + 'a,
45-
B: Body + Send + 'static,
49+
B: Body + Unpin + Send + 'static,
4650
B::Data: Into<Bytes> + Send,
4751
B::Error: Into<Error> + Send + Debug,
48-
E: Debug + Into<Diagnostic>,
4952
{
5053
type Response = StreamResponse<BodyStream<B>>;
5154
type Error = E;
52-
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, E>> + Send + 'a>>;
55+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'a>>;
5356

5457
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
5558
self.service.poll_ready(cx)
5659
}
5760

5861
fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
5962
let event: Request = req.payload.into();
60-
let fut = self.service.call(event.with_lambda_context(req.context));
61-
Box::pin(async move {
62-
let res = fut.await?;
63-
let (parts, body) = res.into_parts();
64-
65-
let mut headers = parts.headers;
66-
let cookies = headers
67-
.get_all(SET_COOKIE)
68-
.iter()
69-
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
70-
.collect::<Vec<_>>();
71-
headers.remove(SET_COOKIE);
72-
73-
Ok(StreamResponse {
74-
metadata_prelude: MetadataPrelude {
75-
headers,
76-
status_code: parts.status,
77-
cookies,
78-
},
79-
stream: BodyStream { body },
80-
})
81-
})
63+
Box::pin(
64+
self.service
65+
.call(event.with_lambda_context(req.context))
66+
.map_ok(into_stream_response),
67+
)
8268
}
8369
}
8470

71+
/// Converts an `http::Response<B>` into a streaming Lambda response.
72+
fn into_stream_response<B>(res: Response<B>) -> StreamResponse<BodyStream<B>>
73+
where
74+
B: Body + Unpin + Send + 'static,
75+
B::Data: Into<Bytes> + Send,
76+
B::Error: Into<Error> + Send + Debug,
77+
{
78+
let (parts, body) = res.into_parts();
79+
80+
let mut headers = parts.headers;
81+
let cookies = headers
82+
.get_all(SET_COOKIE)
83+
.iter()
84+
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
85+
.collect::<Vec<_>>();
86+
headers.remove(SET_COOKIE);
87+
88+
StreamResponse {
89+
metadata_prelude: MetadataPrelude {
90+
headers,
91+
status_code: parts.status,
92+
cookies,
93+
},
94+
stream: BodyStream { body },
95+
}
96+
}
97+
98+
/// Builds a streaming-aware Tower service from a `Service<Request>` **without**
99+
/// boxing its future (no heap allocation / vtable).
100+
///
101+
/// Transforms `LambdaEvent<LambdaRequest>` into `Request` with Lambda context
102+
/// and wraps `Response<B>` into `StreamResponse<BodyStream<B>>`.
103+
///
104+
/// Used internally by [`run_with_streaming_response`]; not part of the public
105+
/// API.
106+
#[allow(clippy::type_complexity)]
107+
fn into_streaming_response<'a, S, B, E>(
108+
handler: S,
109+
) -> MapResponse<
110+
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
111+
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
112+
>
113+
where
114+
S: Service<Request, Response = Response<B>, Error = E>,
115+
S::Future: Send + 'a,
116+
E: Debug + Into<Diagnostic>,
117+
B: Body + Unpin + Send + 'static,
118+
B::Data: Into<Bytes> + Send,
119+
B::Error: Into<Error> + Send + Debug,
120+
{
121+
ServiceBuilder::new()
122+
.map_request(|req: LambdaEvent<LambdaRequest>| {
123+
let event: Request = req.payload.into();
124+
event.with_lambda_context(req.context)
125+
})
126+
.service(handler)
127+
.map_response(into_stream_response)
128+
}
129+
85130
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
86131
/// responses.
87132
///
@@ -98,7 +143,7 @@ where
98143
B::Data: Into<Bytes> + Send,
99144
B::Error: Into<Error> + Send + Debug,
100145
{
101-
lambda_runtime::run(StreamAdapter::from(handler)).await
146+
lambda_runtime::run(into_streaming_response(handler)).await
102147
}
103148

104149
pin_project_lite::pin_project! {
@@ -127,3 +172,59 @@ where
127172
}
128173
}
129174
}
175+
176+
#[cfg(test)]
177+
mod test_stream_adapter {
178+
use super::*;
179+
180+
use crate::{
181+
tower::{ServiceBuilder, ServiceExt},
182+
Body, Request, StreamAdapter,
183+
};
184+
use http::StatusCode;
185+
186+
// A middleware that logs requests before forwarding them to another service
187+
struct LogService<S> {
188+
inner: S,
189+
}
190+
191+
impl<S> Service<LambdaEvent<LambdaRequest>> for LogService<S>
192+
where
193+
S: Service<LambdaEvent<LambdaRequest>>,
194+
{
195+
type Response = S::Response;
196+
type Error = S::Error;
197+
type Future = S::Future;
198+
199+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200+
self.inner.poll_ready(cx)
201+
}
202+
203+
fn call(&mut self, event: LambdaEvent<LambdaRequest>) -> Self::Future {
204+
// Log the request
205+
println!("Lambda event: {event:#?}");
206+
207+
self.inner.call(event)
208+
}
209+
}
210+
211+
/// This tests that `StreamAdapter` can be used in a `tower::Service` where
212+
/// the user may require additional middleware between `lambda_runtime::run`
213+
/// and where the `LambdaEvent` is converted into a `Request`.
214+
#[test]
215+
fn stream_adapter_is_boxable() {
216+
let _svc = ServiceBuilder::new()
217+
.layer_fn(|service| {
218+
// This could be any middleware that logs, inspects, or
219+
// manipulates the `LambdaEvent` before it's converted to a
220+
// `Request` by `Adapter`.
221+
222+
LogService { inner: service }
223+
})
224+
.layer_fn(StreamAdapter::from)
225+
.service_fn(
226+
|_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) },
227+
)
228+
.boxed();
229+
}
230+
}

0 commit comments

Comments
 (0)