Skip to content

Commit b540542

Browse files
committed
Factor common response Builder trait out for actix & warp
This proved very difficult due to the different ownership behavior between each builder, i.e. warp is a consuming builder, actix isn't. With the use of the Rc/Cell/RefCell, I worry that this is now more complex than it was, and for all I know, I've introduced a memory leak somewhere. :) Since the reqwest builder is also consuming, it should be able to follow the same "adapter" pattern as warp. Unfortunately, the reqwest builder doesn't implement the Default trait, so I can't use take() and I've yet to come up with another solution. Since 2/3 of the builders are consumers, it's possible we might simplify the code by having the new Builder trait reflect that, i.e. using self instead of &self in the fn params. We'll investigate that next. For now, I'm just happy to have 2 builders sharing some formerly redundant behavior. Signed-off-by: Jim Crossley <[email protected]>
1 parent c4e8780 commit b540542

File tree

7 files changed

+122
-136
lines changed

7 files changed

+122
-136
lines changed

src/binding/actix/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,3 @@ pub use server_request::request_to_event;
4848
pub use server_request::HttpRequestExt;
4949
pub use server_response::event_to_response;
5050
pub use server_response::HttpResponseBuilderExt;
51-
pub use server_response::HttpResponseSerializer;

src/binding/actix/server_response.rs

Lines changed: 12 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,31 @@
1-
use crate::event::SpecVersion;
2-
use crate::message::{
3-
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
4-
};
1+
use crate::binding::http::{Builder, Serializer};
2+
use crate::message::{BinaryDeserializer, Result};
53
use crate::Event;
6-
use crate::{
7-
binding::{
8-
http::{header_prefix, SPEC_VERSION_HEADER},
9-
CLOUDEVENTS_JSON_HEADER,
10-
},
11-
str_to_header_value,
12-
};
134
use actix_web::dev::HttpResponseBuilder;
145
use actix_web::http::StatusCode;
156
use actix_web::HttpResponse;
167
use async_trait::async_trait;
178
use futures::future::LocalBoxFuture;
189
use futures::FutureExt;
1910

20-
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
21-
pub struct HttpResponseSerializer {
22-
builder: HttpResponseBuilder,
23-
}
24-
25-
impl HttpResponseSerializer {
26-
pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer {
27-
HttpResponseSerializer { builder }
28-
}
29-
}
30-
31-
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
32-
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
33-
self.builder
34-
.set_header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
35-
Ok(self)
36-
}
37-
38-
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
39-
self.builder
40-
.set_header(&header_prefix(name), str_to_header_value!(value)?);
41-
Ok(self)
11+
impl Builder<HttpResponse> for HttpResponseBuilder {
12+
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
13+
self.set_header(key, value);
4214
}
43-
44-
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
45-
self.builder
46-
.set_header(&header_prefix(name), str_to_header_value!(value)?);
47-
Ok(self)
48-
}
49-
50-
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
51-
Ok(self.builder.body(bytes))
15+
fn body(&mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
16+
Ok(HttpResponseBuilder::body(self, bytes))
5217
}
53-
54-
fn end(mut self) -> Result<HttpResponse> {
55-
Ok(self.builder.finish())
56-
}
57-
}
58-
59-
impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
60-
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
61-
Ok(self
62-
.builder
63-
.set_header(
64-
actix_web::http::header::CONTENT_TYPE,
65-
CLOUDEVENTS_JSON_HEADER,
66-
)
67-
.body(bytes))
18+
fn finish(&mut self) -> Result<HttpResponse> {
19+
Ok(HttpResponseBuilder::finish(self))
6820
}
6921
}
7022

7123
/// Method to fill an [`HttpResponseBuilder`] with an [`Event`].
72-
pub async fn event_to_response(
24+
pub async fn event_to_response<T: Builder<HttpResponse> + 'static>(
7325
event: Event,
74-
response: HttpResponseBuilder,
26+
response: T,
7527
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
76-
BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
28+
BinaryDeserializer::deserialize_binary(event, Serializer::new(response))
7729
.map_err(actix_web::error::ErrorBadRequest)
7830
}
7931

src/binding/http/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use crate::{
77
};
88
use deserializer::Deserializer;
99
pub use headers::Headers;
10+
mod serializer;
11+
12+
pub use serializer::Builder;
13+
pub use serializer::Serializer;
1014

1115
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
1216

src/binding/http/serializer.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use std::{cell::RefCell, rc::Rc};
2+
3+
use crate::binding::{
4+
http::{header_prefix, SPEC_VERSION_HEADER},
5+
CLOUDEVENTS_JSON_HEADER,
6+
};
7+
use crate::event::SpecVersion;
8+
use crate::message::{BinarySerializer, MessageAttributeValue, Result, StructuredSerializer};
9+
10+
macro_rules! str_to_header_value {
11+
($header_value:expr) => {
12+
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
13+
crate::message::Error::Other {
14+
source: Box::new(e),
15+
}
16+
})
17+
};
18+
}
19+
20+
pub trait Builder<R> {
21+
fn header(&mut self, key: &str, value: http::header::HeaderValue);
22+
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;
23+
fn finish(&mut self) -> Result<R>;
24+
}
25+
26+
pub struct Serializer<T> {
27+
builder: Rc<RefCell<dyn Builder<T>>>,
28+
}
29+
30+
impl<T> Serializer<T> {
31+
pub fn new<B: Builder<T> + 'static>(delegate: B) -> Serializer<T> {
32+
let builder = Rc::new(RefCell::new(delegate));
33+
Serializer { builder }
34+
}
35+
}
36+
37+
impl<T> BinarySerializer<T> for Serializer<T> {
38+
fn set_spec_version(self, spec_version: SpecVersion) -> Result<Self> {
39+
self.builder
40+
.borrow_mut()
41+
.header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
42+
Ok(self)
43+
}
44+
45+
fn set_attribute(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
46+
self.builder
47+
.borrow_mut()
48+
.header(&header_prefix(name), str_to_header_value!(value)?);
49+
Ok(self)
50+
}
51+
52+
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
53+
self.builder
54+
.borrow_mut()
55+
.header(&header_prefix(name), str_to_header_value!(value)?);
56+
Ok(self)
57+
}
58+
59+
fn end_with_data(self, bytes: Vec<u8>) -> Result<T> {
60+
self.builder.borrow_mut().body(bytes)
61+
}
62+
63+
fn end(self) -> Result<T> {
64+
self.builder.borrow_mut().finish()
65+
}
66+
}
67+
68+
impl<T> StructuredSerializer<T> for Serializer<T> {
69+
fn set_structured_event(self, bytes: Vec<u8>) -> Result<T> {
70+
let mut builder = self.builder.borrow_mut();
71+
builder.header(
72+
http::header::CONTENT_TYPE.as_str(),
73+
http::HeaderValue::from_static(CLOUDEVENTS_JSON_HEADER),
74+
);
75+
builder.body(bytes)
76+
}
77+
}

src/binding/mod.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,6 @@ pub(crate) mod kafka {
1919
}
2020
}
2121

22-
#[cfg(any(feature = "actix", feature = "warp"))]
23-
#[macro_export]
24-
macro_rules! str_to_header_value {
25-
($header_value:expr) => {
26-
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
27-
crate::message::Error::Other {
28-
source: Box::new(e),
29-
}
30-
})
31-
};
32-
}
33-
3422
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
3523
pub(crate) static CONTENT_TYPE: &str = "content-type";
3624

src/binding/reqwest/client_request.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ use crate::message::{
1111
use crate::Event;
1212
use reqwest::RequestBuilder;
1313

14+
// TODO: Ideally, we'd only need to implement binding::http::Builder
15+
// for reqwest::RequestBuilder here, but because the latter is a
16+
// consuming builder, we'd need an intermediate struct similar to
17+
// warp's to adapt that interface. Unfortunately, the reqwest builder
18+
// doesn't implement the Default trait, so I can't use take() as
19+
// warp's Adapter does, and I've yet to come up with another
20+
// solution. So for now, we continue to implement BinarySerializer
21+
// directly in here.
22+
1423
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
1524
pub struct RequestSerializer {
1625
req: RequestBuilder,

src/binding/warp/server_response.rs

Lines changed: 20 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,40 @@
1+
use std::cell::Cell;
2+
13
use warp_lib as warp;
24

3-
use crate::binding::{
4-
http::{header_prefix, SPEC_VERSION_HEADER},
5-
CLOUDEVENTS_JSON_HEADER,
6-
};
7-
use crate::event::SpecVersion;
8-
use crate::message::{
9-
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
10-
StructuredSerializer,
11-
};
12-
use crate::{str_to_header_value, Event};
5+
use crate::binding::http::{Builder, Serializer};
6+
use crate::message::{BinaryDeserializer, Error, Result};
7+
use crate::Event;
138

149
use warp::hyper::Body;
1510
use warp::reply::Response;
1611

17-
use http::response::Builder;
18-
19-
pub struct ResponseSerializer {
20-
builder: Builder,
21-
}
22-
23-
impl ResponseSerializer {
24-
fn new() -> Self {
25-
ResponseSerializer {
26-
builder: http::Response::builder(),
27-
}
28-
}
12+
struct Adapter {
13+
builder: Cell<http::response::Builder>,
2914
}
3015

31-
impl BinarySerializer<Response> for ResponseSerializer {
32-
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
33-
self.builder = self
34-
.builder
35-
.header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
36-
Ok(self)
37-
}
38-
39-
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
40-
self.builder = self
41-
.builder
42-
.header(&header_prefix(name), str_to_header_value!(value)?);
43-
Ok(self)
44-
}
45-
46-
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
47-
self.builder = self
48-
.builder
49-
.header(&header_prefix(name), str_to_header_value!(value)?);
50-
Ok(self)
16+
impl Builder<Response> for Adapter {
17+
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
18+
self.builder.set(self.builder.take().header(key, value));
5119
}
52-
53-
fn end_with_data(self, bytes: Vec<u8>) -> Result<Response> {
20+
fn body(&mut self, bytes: Vec<u8>) -> Result<Response> {
5421
self.builder
22+
.take()
5523
.body(Body::from(bytes))
5624
.map_err(|e| crate::message::Error::Other {
5725
source: Box::new(e),
5826
})
5927
}
60-
61-
fn end(self) -> Result<Response> {
62-
self.builder
63-
.body(Body::empty())
64-
.map_err(|e| crate::message::Error::Other {
65-
source: Box::new(e),
66-
})
67-
}
68-
}
69-
70-
impl StructuredSerializer<Response> for ResponseSerializer {
71-
fn set_structured_event(self, bytes: Vec<u8>) -> Result<Response> {
72-
self.builder
73-
.header(http::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
74-
.body(Body::from(bytes))
75-
.map_err(|e| crate::message::Error::Other {
76-
source: Box::new(e),
77-
})
28+
fn finish(&mut self) -> Result<Response> {
29+
self.body(Vec::new())
7830
}
7931
}
8032

8133
pub fn event_to_response(event: Event) -> std::result::Result<Response, Error> {
82-
BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new())
34+
BinaryDeserializer::deserialize_binary(
35+
event,
36+
Serializer::new(Adapter {
37+
builder: Cell::new(http::Response::builder()),
38+
}),
39+
)
8340
}

0 commit comments

Comments
 (0)