Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::run_with_streaming_response;
pub use streaming::{into_streaming_response, run_with_streaming_response};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down
68 changes: 55 additions & 13 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,45 @@ use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuil
use bytes::Bytes;
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::Diagnostic;
pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
pub use lambda_runtime::{
self,
tower::{
util::{MapRequest, MapResponse},
ServiceExt,
},
Error, LambdaEvent, MetadataPrelude, Service, StreamResponse,
};
use lambda_runtime::{tower::util::BoxService, Diagnostic};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;

/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and
/// accepts [`http::Response<http_body::Body>`] as response.
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
pub fn into_streaming_response<'a, S, B, E>(
handler: S,
) -> BoxService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
where
S: Service<Request, Response = Response<B>, Error = E> + Send + 'static,
S::Future: Send + 'a,
E: Debug + Into<Diagnostic> + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
into_streaming_response_inner::<S, B, E>(handler).boxed()
}

#[allow(clippy::type_complexity)]
fn into_streaming_response_inner<'a, S, B, E>(
handler: S,
) -> MapResponse<
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
Expand All @@ -25,19 +49,19 @@ where
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.service(handler)
.map_response(|res| {
.map_response(|res: Response<B>| {
let (parts, body) = res.into_parts();

let mut prelude_headers = parts.headers;

let cookies = prelude_headers.get_all(SET_COOKIE);
let cookies = cookies
let cookies = prelude_headers
.get_all(SET_COOKIE)
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<String>>();
Expand All @@ -54,8 +78,26 @@ where
metadata_prelude,
stream: BodyStream { body },
}
});
})
}

/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
/// See the [AWS docs for response streaming].
///
/// [AWS docs for response streaming]:
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
E: Debug + Into<Diagnostic>,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = into_streaming_response_inner(handler);
lambda_runtime::run(svc).await
}

Expand Down