Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::run_with_streaming_response;
pub use streaming::{run_with_streaming_response, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down
118 changes: 80 additions & 38 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt};
use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt};
use bytes::Bytes;
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::Diagnostic;
pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
use std::{
use core::{
fmt::Debug,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;
use futures_util::Stream;
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::Diagnostic;
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
use std::marker::PhantomData;

/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and
/// accepts [`http::Response<http_body::Body>`] as response.
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
/// An adapter that lifts a standard [`Service<Request>`] into a
/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP
/// responses.
pub struct StreamAdapter<'a, S, B> {
service: S,
_phantom_data: PhantomData<&'a B>,
}

impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
Expand All @@ -25,38 +30,75 @@ where
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.service(handler)
.map_response(|res| {
let (parts, body) = res.into_parts();
fn from(service: S) -> Self {
StreamAdapter {
service,
_phantom_data: PhantomData,
}
}
}

impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
B: Body + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
E: Debug + Into<Diagnostic>,
{
type Response = StreamResponse<BodyStream<B>>;
type Error = E;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, E>> + Send + 'a>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

let mut prelude_headers = parts.headers;
fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
let event: Request = req.payload.into();
let fut = self.service.call(event.with_lambda_context(req.context));
Box::pin(async move {
let res = fut.await?;
let (parts, body) = res.into_parts();

let cookies = prelude_headers.get_all(SET_COOKIE);
let cookies = cookies
let mut headers = parts.headers;
let cookies = headers
.get_all(SET_COOKIE)
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<String>>();

prelude_headers.remove(SET_COOKIE);
.collect::<Vec<_>>();
headers.remove(SET_COOKIE);

let metadata_prelude = MetadataPrelude {
headers: prelude_headers,
status_code: parts.status,
cookies,
};

StreamResponse {
metadata_prelude,
Ok(StreamResponse {
metadata_prelude: MetadataPrelude {
headers,
status_code: parts.status,
cookies,
},
stream: BodyStream { body },
}
});
})
})
}
}

lambda_runtime::run(svc).await
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
/// See the [AWS docs for response streaming].
///
/// [AWS docs for response streaming]:
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
E: Debug + Into<Diagnostic>,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run(StreamAdapter::from(handler)).await
}

pin_project_lite::pin_project! {
Expand Down