@@ -2,16 +2,21 @@ use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestEx
22use bytes:: Bytes ;
33use core:: {
44 fmt:: Debug ,
5- future:: Future ,
65 pin:: Pin ,
76 task:: { Context , Poll } ,
87} ;
9- use futures_util:: Stream ;
8+ use futures_util:: { Stream , TryFutureExt } ;
109pub use http:: { self , Response } ;
1110use http_body:: Body ;
12- use lambda_runtime:: Diagnostic ;
11+ use lambda_runtime:: {
12+ tower:: {
13+ util:: { MapRequest , MapResponse } ,
14+ ServiceBuilder , ServiceExt ,
15+ } ,
16+ Diagnostic ,
17+ } ;
1318pub use lambda_runtime:: { Error , LambdaEvent , MetadataPrelude , Service , StreamResponse } ;
14- use std:: marker:: PhantomData ;
19+ use std:: { future :: Future , marker:: PhantomData } ;
1520
1621/// An adapter that lifts a standard [`Service<Request>`] into a
1722/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP
@@ -25,7 +30,6 @@ impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
2530where
2631 S : Service < Request , Response = Response < B > , Error = E > ,
2732 S :: Future : Send + ' a ,
28- E : Debug + Into < Diagnostic > ,
2933 B : Body + Unpin + Send + ' static ,
3034 B :: Data : Into < Bytes > + Send ,
3135 B :: Error : Into < Error > + Send + Debug ,
@@ -42,46 +46,87 @@ impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B
4246where
4347 S : Service < Request , Response = Response < B > , Error = E > ,
4448 S :: Future : Send + ' a ,
45- B : Body + Send + ' static ,
49+ B : Body + Unpin + Send + ' static ,
4650 B :: Data : Into < Bytes > + Send ,
4751 B :: Error : Into < Error > + Send + Debug ,
48- E : Debug + Into < Diagnostic > ,
4952{
5053 type Response = StreamResponse < BodyStream < B > > ;
5154 type Error = E ;
52- type Future = Pin < Box < dyn Future < Output = Result < Self :: Response , E > > + Send + ' a > > ;
55+ type Future = Pin < Box < dyn Future < Output = Result < Self :: Response , Self :: Error > > + Send + ' a > > ;
5356
5457 fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
5558 self . service . poll_ready ( cx)
5659 }
5760
5861 fn call ( & mut self , req : LambdaEvent < LambdaRequest > ) -> Self :: Future {
5962 let event: Request = req. payload . into ( ) ;
60- let fut = self . service . call ( event. with_lambda_context ( req. context ) ) ;
61- Box :: pin ( async move {
62- let res = fut. await ?;
63- let ( parts, body) = res. into_parts ( ) ;
64-
65- let mut headers = parts. headers ;
66- let cookies = headers
67- . get_all ( SET_COOKIE )
68- . iter ( )
69- . map ( |c| String :: from_utf8_lossy ( c. as_bytes ( ) ) . to_string ( ) )
70- . collect :: < Vec < _ > > ( ) ;
71- headers. remove ( SET_COOKIE ) ;
72-
73- Ok ( StreamResponse {
74- metadata_prelude : MetadataPrelude {
75- headers,
76- status_code : parts. status ,
77- cookies,
78- } ,
79- stream : BodyStream { body } ,
80- } )
81- } )
63+ Box :: pin (
64+ self . service
65+ . call ( event. with_lambda_context ( req. context ) )
66+ . map_ok ( into_stream_response) ,
67+ )
8268 }
8369}
8470
71+ /// Converts an `http::Response<B>` into a streaming Lambda response.
72+ fn into_stream_response < B > ( res : Response < B > ) -> StreamResponse < BodyStream < B > >
73+ where
74+ B : Body + Unpin + Send + ' static ,
75+ B :: Data : Into < Bytes > + Send ,
76+ B :: Error : Into < Error > + Send + Debug ,
77+ {
78+ let ( parts, body) = res. into_parts ( ) ;
79+
80+ let mut headers = parts. headers ;
81+ let cookies = headers
82+ . get_all ( SET_COOKIE )
83+ . iter ( )
84+ . map ( |c| String :: from_utf8_lossy ( c. as_bytes ( ) ) . to_string ( ) )
85+ . collect :: < Vec < _ > > ( ) ;
86+ headers. remove ( SET_COOKIE ) ;
87+
88+ StreamResponse {
89+ metadata_prelude : MetadataPrelude {
90+ headers,
91+ status_code : parts. status ,
92+ cookies,
93+ } ,
94+ stream : BodyStream { body } ,
95+ }
96+ }
97+
98+ /// Builds a streaming-aware Tower service from a `Service<Request>` **without**
99+ /// boxing its future (no heap allocation / vtable).
100+ ///
101+ /// Transforms `LambdaEvent<LambdaRequest>` into `Request` with Lambda context
102+ /// and wraps `Response<B>` into `StreamResponse<BodyStream<B>>`.
103+ ///
104+ /// Used internally by [`run_with_streaming_response`]; not part of the public
105+ /// API.
106+ #[ allow( clippy:: type_complexity) ]
107+ fn into_streaming_response < ' a , S , B , E > (
108+ handler : S ,
109+ ) -> MapResponse <
110+ MapRequest < S , impl FnMut ( LambdaEvent < LambdaRequest > ) -> Request > ,
111+ impl FnOnce ( Response < B > ) -> StreamResponse < BodyStream < B > > + Clone ,
112+ >
113+ where
114+ S : Service < Request , Response = Response < B > , Error = E > ,
115+ S :: Future : Send + ' a ,
116+ E : Debug + Into < Diagnostic > ,
117+ B : Body + Unpin + Send + ' static ,
118+ B :: Data : Into < Bytes > + Send ,
119+ B :: Error : Into < Error > + Send + Debug ,
120+ {
121+ ServiceBuilder :: new ( )
122+ . map_request ( |req : LambdaEvent < LambdaRequest > | {
123+ let event: Request = req. payload . into ( ) ;
124+ event. with_lambda_context ( req. context )
125+ } )
126+ . service ( handler)
127+ . map_response ( into_stream_response)
128+ }
129+
85130/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
86131/// responses.
87132///
98143 B :: Data : Into < Bytes > + Send ,
99144 B :: Error : Into < Error > + Send + Debug ,
100145{
101- lambda_runtime:: run ( StreamAdapter :: from ( handler) ) . await
146+ lambda_runtime:: run ( into_streaming_response ( handler) ) . await
102147}
103148
104149pin_project_lite:: pin_project! {
@@ -127,3 +172,59 @@ where
127172 }
128173 }
129174}
175+
176+ #[ cfg( test) ]
177+ mod test_stream_adapter {
178+ use super :: * ;
179+
180+ use crate :: {
181+ tower:: { ServiceBuilder , ServiceExt } ,
182+ Body , Request , StreamAdapter ,
183+ } ;
184+ use http:: StatusCode ;
185+
186+ // A middleware that logs requests before forwarding them to another service
187+ struct LogService < S > {
188+ inner : S ,
189+ }
190+
191+ impl < S > Service < LambdaEvent < LambdaRequest > > for LogService < S >
192+ where
193+ S : Service < LambdaEvent < LambdaRequest > > ,
194+ {
195+ type Response = S :: Response ;
196+ type Error = S :: Error ;
197+ type Future = S :: Future ;
198+
199+ fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
200+ self . inner . poll_ready ( cx)
201+ }
202+
203+ fn call ( & mut self , event : LambdaEvent < LambdaRequest > ) -> Self :: Future {
204+ // Log the request
205+ println ! ( "Lambda event: {event:#?}" ) ;
206+
207+ self . inner . call ( event)
208+ }
209+ }
210+
211+ /// This tests that `StreamAdapter` can be used in a `tower::Service` where
212+ /// the user may require additional middleware between `lambda_runtime::run`
213+ /// and where the `LambdaEvent` is converted into a `Request`.
214+ #[ test]
215+ fn stream_adapter_is_boxable ( ) {
216+ let _svc = ServiceBuilder :: new ( )
217+ . layer_fn ( |service| {
218+ // This could be any middleware that logs, inspects, or
219+ // manipulates the `LambdaEvent` before it's converted to a
220+ // `Request` by `Adapter`.
221+
222+ LogService { inner : service }
223+ } )
224+ . layer_fn ( StreamAdapter :: from)
225+ . service_fn (
226+ |_req : Request | async move { http:: Response :: builder ( ) . status ( StatusCode :: OK ) . body ( Body :: Empty ) } ,
227+ )
228+ . boxed ( ) ;
229+ }
230+ }
0 commit comments