Skip to content

Commit 24ab63b

Browse files
committed
Support interceptor in spin outbound http
Signed-off-by: Lann Martin <[email protected]>
1 parent 880f4b8 commit 24ab63b

File tree

9 files changed

+256
-75
lines changed

9 files changed

+256
-75
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-http/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ http = "1.1.0"
1010
http-body-util = "0.1"
1111
hyper = "1.4.1"
1212
ip_network = "0.4"
13-
reqwest = { version = "0.11", features = ["gzip"] }
13+
reqwest = { version = "0.12", features = ["gzip"] }
1414
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
1515
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
1616
spin-factors = { path = "../factors" }
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use http::{Request, Response};
2+
use http_body_util::{BodyExt, Full};
3+
use spin_world::async_trait;
4+
use wasmtime_wasi_http::{body::HyperOutgoingBody, HttpResult};
5+
6+
pub type HyperBody = HyperOutgoingBody;
7+
8+
/// An outbound HTTP request interceptor to be used with
9+
/// [`InstanceState::set_request_interceptor`].
10+
#[async_trait]
11+
pub trait OutboundHttpInterceptor: Send + Sync {
12+
/// Intercept an outgoing HTTP request.
13+
///
14+
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
15+
/// updated) request will be passed on to the default outgoing request
16+
/// handler.
17+
///
18+
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
19+
/// result will be returned as the result of the request, bypassing the
20+
/// default handler. The `request` will also be dropped immediately.
21+
async fn intercept(&self, request: InterceptRequest) -> HttpResult<InterceptOutcome>;
22+
}
23+
24+
/// The type returned by an [`OutboundHttpInterceptor`].
25+
pub enum InterceptOutcome {
26+
/// The intercepted request will be passed on to the default outgoing
27+
/// request handler.
28+
Continue(InterceptRequest),
29+
/// The given response will be returned as the result of the intercepted
30+
/// request, bypassing the default handler.
31+
Complete(Response<HyperBody>),
32+
}
33+
34+
/// An intercepted outgoing HTTP request.
35+
///
36+
/// This is a wrapper that implements `DerefMut<Target = Request<()>>` for
37+
/// inspection and modification of the request envelope. If the body needs to be
38+
/// consumed, call [`Self::into_hyper_request`].
39+
pub struct InterceptRequest {
40+
inner: Request<()>,
41+
body: InterceptBody,
42+
}
43+
44+
enum InterceptBody {
45+
Hyper(HyperBody),
46+
Vec(Vec<u8>),
47+
}
48+
49+
impl InterceptRequest {
50+
pub fn into_hyper_request(self) -> Request<HyperBody> {
51+
let (parts, ()) = self.inner.into_parts();
52+
Request::from_parts(parts, self.body.into())
53+
}
54+
55+
pub(crate) fn into_vec_request(self) -> Option<Request<Vec<u8>>> {
56+
let InterceptBody::Vec(bytes) = self.body else {
57+
return None;
58+
};
59+
let (parts, ()) = self.inner.into_parts();
60+
Some(Request::from_parts(parts, bytes))
61+
}
62+
}
63+
64+
impl std::ops::Deref for InterceptRequest {
65+
type Target = Request<()>;
66+
67+
fn deref(&self) -> &Self::Target {
68+
&self.inner
69+
}
70+
}
71+
72+
impl std::ops::DerefMut for InterceptRequest {
73+
fn deref_mut(&mut self) -> &mut Self::Target {
74+
&mut self.inner
75+
}
76+
}
77+
78+
impl From<Request<HyperBody>> for InterceptRequest {
79+
fn from(req: Request<HyperBody>) -> Self {
80+
let (parts, body) = req.into_parts();
81+
Self {
82+
inner: Request::from_parts(parts, ()),
83+
body: InterceptBody::Hyper(body),
84+
}
85+
}
86+
}
87+
88+
impl From<Request<Vec<u8>>> for InterceptRequest {
89+
fn from(req: Request<Vec<u8>>) -> Self {
90+
let (parts, body) = req.into_parts();
91+
Self {
92+
inner: Request::from_parts(parts, ()),
93+
body: InterceptBody::Vec(body),
94+
}
95+
}
96+
}
97+
98+
impl From<InterceptBody> for HyperBody {
99+
fn from(body: InterceptBody) -> Self {
100+
match body {
101+
InterceptBody::Hyper(body) => body,
102+
InterceptBody::Vec(bytes) => {
103+
Full::new(bytes.into()).map_err(|err| match err {}).boxed()
104+
}
105+
}
106+
}
107+
}

crates/factor-outbound-http/src/lib.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod intercept;
12
mod spin;
23
mod wasi;
34
pub mod wasi_2023_10_18;
@@ -10,13 +11,13 @@ use http::{
1011
uri::{Authority, Parts, PathAndQuery, Scheme},
1112
HeaderValue, Uri,
1213
};
14+
use intercept::OutboundHttpInterceptor;
1315
use spin_factor_outbound_networking::{
1416
ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor,
1517
};
1618
use spin_factors::{
1719
anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1820
};
19-
use spin_world::async_trait;
2021
use wasmtime_wasi_http::WasiHttpCtx;
2122

2223
pub use wasmtime_wasi_http::{
@@ -176,29 +177,3 @@ impl std::fmt::Display for SelfRequestOrigin {
176177
write!(f, "{}://{}", self.scheme, self.authority)
177178
}
178179
}
179-
180-
/// An outbound HTTP request interceptor to be used with
181-
/// [`InstanceState::set_request_interceptor`].
182-
#[async_trait]
183-
pub trait OutboundHttpInterceptor: Send + Sync {
184-
/// Intercept an outgoing HTTP request.
185-
///
186-
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
187-
/// updated) request will be passed on to the default outgoing request
188-
/// handler.
189-
///
190-
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
191-
/// result will be returned as the result of the request, bypassing the
192-
/// default handler. The `request` will also be dropped immediately.
193-
async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome>;
194-
}
195-
196-
/// The type returned by an [`OutboundHttpInterceptor`].
197-
pub enum InterceptOutcome {
198-
/// The intercepted request will be passed on to the default outgoing
199-
/// request handler.
200-
Continue,
201-
/// The given response will be returned as the result of the intercepted
202-
/// request, bypassing the default handler.
203-
Complete(Response),
204-
}

crates/factor-outbound-http/src/spin.rs

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use http_body_util::BodyExt;
12
use spin_world::{
23
async_trait,
34
v1::{
@@ -7,6 +8,8 @@ use spin_world::{
78
};
89
use tracing::{field::Empty, instrument, Level, Span};
910

11+
use crate::intercept::InterceptOutcome;
12+
1013
#[async_trait]
1114
impl spin_http::Host for crate::InstanceState {
1215
#[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
@@ -19,7 +22,11 @@ impl spin_http::Host for crate::InstanceState {
1922
let uri = req.uri;
2023
tracing::trace!("Sending outbound HTTP to {uri:?}");
2124

22-
let abs_url = if !uri.starts_with('/') {
25+
if !req.params.is_empty() {
26+
tracing::warn!("HTTP params field is deprecated");
27+
}
28+
29+
let req_url = if !uri.starts_with('/') {
2330
// Absolute URI
2431
let is_allowed = self
2532
.allowed_hosts
@@ -29,7 +36,7 @@ impl spin_http::Host for crate::InstanceState {
2936
if !is_allowed {
3037
return Err(HttpError::DestinationNotAllowed);
3138
}
32-
uri
39+
uri.parse().map_err(|_| HttpError::InvalidUrl)?
3340
} else {
3441
// Relative URI ("self" request)
3542
let is_allowed = self
@@ -47,36 +54,51 @@ impl spin_http::Host for crate::InstanceState {
4754
);
4855
return Err(HttpError::InvalidUrl);
4956
};
50-
format!("{origin}{uri}")
57+
let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
58+
origin.clone().into_uri(Some(path_and_query))
5159
};
52-
let req_url = reqwest::Url::parse(&abs_url).map_err(|_| HttpError::InvalidUrl)?;
53-
54-
if !req.params.is_empty() {
55-
tracing::warn!("HTTP params field is deprecated");
56-
}
57-
58-
// Allow reuse of Client's internal connection pool for multiple requests
59-
// in a single component execution
60-
let client = self.spin_http_client.get_or_insert_with(Default::default);
6160

61+
// Build an http::Request for OutboundHttpInterceptor
6262
let mut req = {
63-
let mut builder = client.request(reqwest_method(req.method), req_url);
63+
let mut builder = http::Request::builder()
64+
.method(hyper_method(req.method))
65+
.uri(&req_url);
6466
for (key, val) in req.headers {
6567
builder = builder.header(key, val);
6668
}
67-
builder
68-
.body(req.body.unwrap_or_default())
69-
.build()
70-
.map_err(|err| {
71-
tracing::error!("Error building outbound request: {err}");
72-
HttpError::RuntimeError
73-
})?
74-
};
69+
builder.body(req.body.unwrap_or_default())
70+
}
71+
.map_err(|err| {
72+
tracing::error!("Error building outbound request: {err}");
73+
HttpError::RuntimeError
74+
})?;
75+
7576
spin_telemetry::inject_trace_context(req.headers_mut());
7677

78+
if let Some(interceptor) = &self.request_interceptor {
79+
let intercepted_request = std::mem::take(&mut req).into();
80+
match interceptor.intercept(intercepted_request).await {
81+
Ok(InterceptOutcome::Continue(intercepted_request)) => {
82+
req = intercepted_request.into_vec_request().unwrap();
83+
}
84+
Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
85+
Err(err) => {
86+
tracing::error!("Error in outbound HTTP interceptor: {err}");
87+
return Err(HttpError::RuntimeError);
88+
}
89+
}
90+
}
91+
92+
// Convert http::Request to reqwest::Request
93+
let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
94+
95+
// Allow reuse of Client's internal connection pool for multiple requests
96+
// in a single component execution
97+
let client = self.spin_http_client.get_or_insert_with(Default::default);
98+
7799
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
78100

79-
tracing::trace!("Returning response from outbound request to {abs_url}");
101+
tracing::trace!("Returning response from outbound request to {req_url}");
80102
span.record("http.response.status_code", resp.status().as_u16());
81103
response_from_reqwest(resp).await
82104
}
@@ -111,18 +133,52 @@ fn record_request_fields(span: &Span, req: &Request) {
111133
}
112134
}
113135

114-
fn reqwest_method(m: Method) -> reqwest::Method {
136+
fn hyper_method(m: Method) -> http::Method {
115137
match m {
116-
Method::Get => reqwest::Method::GET,
117-
Method::Post => reqwest::Method::POST,
118-
Method::Put => reqwest::Method::PUT,
119-
Method::Delete => reqwest::Method::DELETE,
120-
Method::Patch => reqwest::Method::PATCH,
121-
Method::Head => reqwest::Method::HEAD,
122-
Method::Options => reqwest::Method::OPTIONS,
138+
Method::Get => http::Method::GET,
139+
Method::Post => http::Method::POST,
140+
Method::Put => http::Method::PUT,
141+
Method::Delete => http::Method::DELETE,
142+
Method::Patch => http::Method::PATCH,
143+
Method::Head => http::Method::HEAD,
144+
Method::Options => http::Method::OPTIONS,
123145
}
124146
}
125147

148+
async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
149+
let status = resp.status().as_u16();
150+
151+
let headers = resp
152+
.headers()
153+
.into_iter()
154+
.map(|(key, val)| {
155+
Ok((
156+
key.to_string(),
157+
val.to_str()
158+
.map_err(|_| {
159+
tracing::error!("Non-ascii response header {key} = {val:?}");
160+
HttpError::RuntimeError
161+
})?
162+
.to_string(),
163+
))
164+
})
165+
.collect::<Result<Vec<_>, _>>()?;
166+
167+
let body = resp
168+
.body_mut()
169+
.collect()
170+
.await
171+
.map_err(|_| HttpError::RuntimeError)?
172+
.to_bytes()
173+
.to_vec();
174+
175+
Ok(Response {
176+
status,
177+
headers: Some(headers),
178+
body: Some(body),
179+
})
180+
}
181+
126182
fn log_reqwest_error(err: reqwest::Error) -> HttpError {
127183
let error_desc = if err.is_timeout() {
128184
"timeout error"

crates/factor-outbound-http/src/wasi.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use wasmtime_wasi_http::{
1818
};
1919

2020
use crate::{
21-
wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor,
22-
OutboundHttpInterceptor, SelfRequestOrigin,
21+
intercept::{InterceptOutcome, OutboundHttpInterceptor},
22+
wasi_2023_10_18, wasi_2023_11_10, InstanceState, OutboundHttpFactor, SelfRequestOrigin,
2323
};
2424

2525
pub(crate) fn add_to_linker<T: Send + 'static>(
@@ -133,8 +133,11 @@ async fn send_request_impl(
133133
spin_telemetry::inject_trace_context(&mut request);
134134

135135
if let Some(interceptor) = request_interceptor {
136-
match interceptor.intercept(&mut request).await? {
137-
InterceptOutcome::Continue => (),
136+
let intercept_request = std::mem::take(&mut request).into();
137+
match interceptor.intercept(intercept_request).await? {
138+
InterceptOutcome::Continue(req) => {
139+
request = req.into_hyper_request();
140+
}
138141
InterceptOutcome::Complete(resp) => {
139142
let resp = IncomingResponse {
140143
resp,

crates/factors-executor/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,7 @@ impl<T: RuntimeFactors, U: Send + 'static> FactorsExecutorApp<T, U> {
145145
.with_context(|| format!("no such component {component_id:?}"))?;
146146
Ok(instance_pre.component())
147147
}
148-
}
149148

150-
impl<T: RuntimeFactors, U: Send + 'static> FactorsExecutorApp<T, U> {
151149
/// Returns an instance builder for the given component ID.
152150
pub fn prepare(&self, component_id: &str) -> anyhow::Result<FactorsInstanceBuilder<T, U>> {
153151
let app_component = self

0 commit comments

Comments
 (0)