Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ use std::{
};

mod streaming;
pub use streaming::run_with_streaming_response;
pub use streaming::{into_streaming_response, run_with_streaming_response};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down
51 changes: 42 additions & 9 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,36 @@ use bytes::Bytes;
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::Diagnostic;
pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
pub use lambda_runtime::{
self,
tower::{
util::{MapRequest, MapResponse},
ServiceExt,
},
Error, LambdaEvent, MetadataPrelude, Service, StreamResponse,
};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;

/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
/// Converts a handler into a streaming-compatible service for use with AWS
/// Lambda.
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and
/// accepts [`http::Response<http_body::Body>`] as response.
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
/// This function wraps a `Service` implementation, transforming its input and
/// output to be compatible with AWS Lambda's streaming response feature. It
/// provides the necessary middleware to handle `LambdaEvent` requests and
/// converts the `http::Response` into a `StreamResponse` containing a metadata
/// prelude and body stream.
#[allow(clippy::type_complexity)]
pub fn into_streaming_response<'a, S, B, E>(
handler: S,
) -> MapResponse<
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
Expand All @@ -25,13 +41,13 @@ where
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.service(handler)
.map_response(|res| {
.map_response(|res: Response<B>| {
let (parts, body) = res.into_parts();

let mut prelude_headers = parts.headers;
Expand All @@ -54,8 +70,25 @@ where
metadata_prelude,
stream: BodyStream { body },
}
});
})
}

/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
/// Streaming
/// Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and
/// accepts [`http::Response<http_body::Body>`] as response.
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
E: Debug + Into<Diagnostic>,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = into_streaming_response(handler);
lambda_runtime::run(svc).await
}

Expand Down
Loading