Skip to content

Commit 8971708

Browse files
authored
Merge pull request #2740 from lann/factors-spin-http
factors: Implement spin outbound http
2 parents 2491098 + 07a238a commit 8971708

File tree

17 files changed

+215
-138
lines changed

17 files changed

+215
-138
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-http/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ anyhow = "1.0"
99
http = "1.1.0"
1010
http-body-util = "0.1"
1111
hyper = "1.4.1"
12+
reqwest = { version = "0.11", features = ["gzip"] }
1213
rustls = "0.23"
1314
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
1415
spin-factors = { path = "../factors" }
16+
spin-telemetry = { path = "../telemetry" }
1517
spin-world = { path = "../world" }
1618
terminal = { path = "../terminal" }
1719
tokio = { version = "1", features = ["macros", "rt"] }

crates/factor-outbound-http/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl Factor for OutboundHttpFactor {
7171
component_tls_configs,
7272
self_request_origin: None,
7373
request_interceptor: None,
74+
spin_http_client: None,
7475
})
7576
}
7677
}
@@ -81,6 +82,8 @@ pub struct InstanceState {
8182
component_tls_configs: ComponentTlsConfigs,
8283
self_request_origin: Option<SelfRequestOrigin>,
8384
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
85+
// Connection-pooling client for 'fermyon:spin/http' interface
86+
spin_http_client: Option<reqwest::Client>,
8487
}
8588

8689
impl InstanceState {
@@ -156,6 +159,12 @@ impl SelfRequestOrigin {
156159
}
157160
}
158161

162+
impl std::fmt::Display for SelfRequestOrigin {
163+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164+
write!(f, "{}://{}", self.scheme, self.authority)
165+
}
166+
}
167+
159168
/// An outbound HTTP request interceptor to be used with
160169
/// [`InstanceState::set_request_interceptor`].
161170
pub trait OutboundHttpInterceptor: Send + Sync {
Lines changed: 162 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,84 @@
11
use spin_world::{
22
async_trait,
3-
v1::http,
4-
v1::http_types::{self, HttpError, Request, Response},
3+
v1::{
4+
http as spin_http,
5+
http_types::{self, HttpError, Method, Request, Response},
6+
},
57
};
8+
use tracing::{field::Empty, instrument, Level, Span};
69

710
#[async_trait]
8-
impl http::Host for crate::InstanceState {
11+
impl spin_http::Host for crate::InstanceState {
12+
#[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
13+
fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
14+
http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
915
async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
10-
// FIXME(lann): This is all just a stub to test allowed_outbound_hosts
11-
match self.allowed_hosts.check_url(&req.uri, "https").await {
12-
Ok(true) => (),
13-
_ => {
16+
let span = Span::current();
17+
record_request_fields(&span, &req);
18+
19+
let uri = req.uri;
20+
tracing::trace!("Sending outbound HTTP to {uri:?}");
21+
22+
let abs_url = if !uri.starts_with('/') {
23+
// Absolute URI
24+
let is_allowed = self
25+
.allowed_hosts
26+
.check_url(&uri, "https")
27+
.await
28+
.unwrap_or(false);
29+
if !is_allowed {
30+
return Err(HttpError::DestinationNotAllowed);
31+
}
32+
uri
33+
} else {
34+
// Relative URI ("self" request)
35+
let is_allowed = self
36+
.allowed_hosts
37+
.check_relative_url(&["http", "https"])
38+
.await
39+
.unwrap_or(false);
40+
if !is_allowed {
1441
return Err(HttpError::DestinationNotAllowed);
1542
}
43+
44+
let Some(origin) = &self.self_request_origin else {
45+
tracing::error!(
46+
"Couldn't handle outbound HTTP request to relative URI; no origin set"
47+
);
48+
return Err(HttpError::InvalidUrl);
49+
};
50+
format!("{origin}{uri}")
51+
};
52+
let req_url = reqwest::Url::parse(&abs_url).map_err(|_| HttpError::InvalidUrl)?;
53+
54+
if !req.params.is_empty() {
55+
tracing::warn!("HTTP params field is deprecated");
1656
}
17-
Ok(Response {
18-
status: 200,
19-
headers: None,
20-
body: Some(b"test response".into()),
21-
})
57+
58+
// Allow reuse of Client's internal connection pool for multiple requests
59+
// in a single component execution
60+
let client = self.spin_http_client.get_or_insert_with(Default::default);
61+
62+
let mut req = {
63+
let mut builder = client.request(reqwest_method(req.method), req_url);
64+
for (key, val) in req.headers {
65+
builder = builder.header(key, val);
66+
}
67+
builder
68+
.body(req.body.unwrap_or_default())
69+
.build()
70+
.map_err(|err| {
71+
tracing::error!("Error building outbound request: {err}");
72+
HttpError::RuntimeError
73+
})?
74+
};
75+
spin_telemetry::inject_trace_context(req.headers_mut());
76+
77+
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
78+
79+
tracing::trace!("Returning response from outbound request to {abs_url}");
80+
span.record("http.response.status_code", resp.status().as_u16());
81+
response_from_reqwest(resp).await
2282
}
2383
}
2484

@@ -27,3 +87,93 @@ impl http_types::Host for crate::InstanceState {
2787
Ok(err)
2888
}
2989
}
90+
91+
fn record_request_fields(span: &Span, req: &Request) {
92+
let method = match req.method {
93+
Method::Get => "GET",
94+
Method::Post => "POST",
95+
Method::Put => "PUT",
96+
Method::Delete => "DELETE",
97+
Method::Patch => "PATCH",
98+
Method::Head => "HEAD",
99+
Method::Options => "OPTIONS",
100+
};
101+
span.record("otel.name", method)
102+
.record("http.request.method", method)
103+
.record("url.full", req.uri.clone());
104+
if let Ok(uri) = req.uri.parse::<http::Uri>() {
105+
if let Some(authority) = uri.authority() {
106+
span.record("server.address", authority.host());
107+
if let Some(port) = authority.port() {
108+
span.record("server.port", port.as_u16());
109+
}
110+
}
111+
}
112+
}
113+
114+
fn reqwest_method(m: Method) -> reqwest::Method {
115+
match m {
116+
Method::Get => reqwest::Method::GET,
117+
Method::Post => reqwest::Method::POST,
118+
Method::Put => reqwest::Method::PUT,
119+
Method::Delete => reqwest::Method::DELETE,
120+
Method::Patch => reqwest::Method::PATCH,
121+
Method::Head => reqwest::Method::HEAD,
122+
Method::Options => reqwest::Method::OPTIONS,
123+
}
124+
}
125+
126+
fn log_reqwest_error(err: reqwest::Error) -> HttpError {
127+
let error_desc = if err.is_timeout() {
128+
"timeout error"
129+
} else if err.is_connect() {
130+
"connection error"
131+
} else if err.is_body() || err.is_decode() {
132+
"message body error"
133+
} else if err.is_request() {
134+
"request error"
135+
} else {
136+
"error"
137+
};
138+
tracing::warn!(
139+
"Outbound HTTP {}: URL {}, error detail {:?}",
140+
error_desc,
141+
err.url()
142+
.map(|u| u.to_string())
143+
.unwrap_or_else(|| "<unknown>".to_owned()),
144+
err
145+
);
146+
HttpError::RuntimeError
147+
}
148+
149+
async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
150+
let status = res.status().as_u16();
151+
152+
let headers = res
153+
.headers()
154+
.into_iter()
155+
.map(|(key, val)| {
156+
Ok((
157+
key.to_string(),
158+
val.to_str()
159+
.map_err(|_| {
160+
tracing::error!("Non-ascii response header {key} = {val:?}");
161+
HttpError::RuntimeError
162+
})?
163+
.to_string(),
164+
))
165+
})
166+
.collect::<Result<Vec<_>, _>>()?;
167+
168+
let body = res
169+
.bytes()
170+
.await
171+
.map_err(|_| HttpError::RuntimeError)?
172+
.to_vec();
173+
174+
Ok(Response {
175+
status,
176+
headers: Some(headers),
177+
body: Some(body),
178+
})
179+
}

crates/factor-outbound-http/src/wasi.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
8686
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
8787
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
8888
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
89+
// wasmtime-wasi-http fills in scheme and authority for relative URLs
90+
// (e.g. https://:443/<path>), which makes them hard to reason about.
91+
// Undo that here.
92+
let uri = request.uri_mut();
93+
if uri
94+
.authority()
95+
.is_some_and(|authority| authority.host().is_empty())
96+
{
97+
let mut builder = http::uri::Builder::new();
98+
if let Some(paq) = uri.path_and_query() {
99+
builder = builder.path_and_query(paq.clone());
100+
}
101+
*uri = builder.build().unwrap();
102+
}
103+
89104
if let Some(interceptor) = &self.state.request_interceptor {
90105
match interceptor.intercept(&mut request, &mut config) {
91106
InterceptOutcome::Continue => (),
@@ -149,6 +164,7 @@ async fn send_request_impl(
149164
config.use_tls = origin.use_tls();
150165

151166
request.headers_mut().insert(HOST, origin.host_header());
167+
spin_telemetry::inject_trace_context(&mut request);
152168

153169
let path_and_query = request.uri().path_and_query().cloned();
154170
*request.uri_mut() = origin.into_uri(path_and_query);

0 commit comments

Comments
 (0)