Skip to content

Commit b4dbbf5

Browse files
committed
factor-outbound-http: Refactor RequestSender to flatten Results
Signed-off-by: Lann Martin <[email protected]>
1 parent dcfd0c5 commit b4dbbf5

File tree

1 file changed

+102
-72
lines changed
  • crates/factor-outbound-http/src

1 file changed

+102
-72
lines changed

crates/factor-outbound-http/src/wasi.rs

Lines changed: 102 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use std::{
88
time::Duration,
99
};
1010

11-
use anyhow::Context as _;
12-
use http::{header::HOST, Request, Uri};
11+
use http::{header::HOST, Uri};
1312
use http_body_util::BodyExt;
1413
use hyper_util::{
1514
client::legacy::{
@@ -35,8 +34,8 @@ use wasmtime::component::HasData;
3534
use wasmtime_wasi_http::{
3635
bindings::http::types::ErrorCode,
3736
body::HyperOutgoingBody,
38-
types::{HostFutureIncomingResponse, IncomingResponse},
39-
WasiHttpCtx, WasiHttpImpl, WasiHttpView,
37+
types::{HostFutureIncomingResponse, IncomingResponse, OutgoingRequestConfig},
38+
HttpError, WasiHttpCtx, WasiHttpImpl, WasiHttpView,
4039
};
4140

4241
use crate::{
@@ -92,6 +91,8 @@ pub(crate) struct WasiHttpImplInner<'a> {
9291
table: &'a mut ResourceTable,
9392
}
9493

94+
type OutgoingRequest = http::Request<HyperOutgoingBody>;
95+
9596
impl WasiHttpView for WasiHttpImplInner<'_> {
9697
fn ctx(&mut self) -> &mut WasiHttpCtx {
9798
&mut self.state.wasi_http_ctx
@@ -116,9 +117,9 @@ impl WasiHttpView for WasiHttpImplInner<'_> {
116117
)]
117118
fn send_request(
118119
&mut self,
119-
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
120-
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
121-
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
120+
request: OutgoingRequest,
121+
config: OutgoingRequestConfig,
122+
) -> Result<wasmtime_wasi_http::types::HostFutureIncomingResponse, HttpError> {
122123
let request_sender = RequestSender {
123124
allowed_hosts: self.state.allowed_hosts.clone(),
124125
component_tls_configs: self.state.component_tls_configs.clone(),
@@ -128,7 +129,18 @@ impl WasiHttpView for WasiHttpImplInner<'_> {
128129
http_clients: self.state.wasi_http_clients.clone(),
129130
};
130131
Ok(HostFutureIncomingResponse::Pending(
131-
wasmtime_wasi::runtime::spawn(request_sender.send(request, config).in_current_span()),
132+
wasmtime_wasi::runtime::spawn(
133+
async {
134+
match request_sender.send(request, config).await {
135+
Ok(resp) => Ok(Ok(resp)),
136+
Err(http_error) => match http_error.downcast() {
137+
Ok(error_code) => Ok(Err(error_code)),
138+
Err(trap) => Err(trap),
139+
},
140+
}
141+
}
142+
.in_current_span(),
143+
),
132144
))
133145
}
134146
}
@@ -145,17 +157,49 @@ struct RequestSender {
145157
impl RequestSender {
146158
async fn send(
147159
self,
148-
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
149-
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
150-
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
151-
let wasmtime_wasi_http::types::OutgoingRequestConfig {
152-
mut use_tls,
153-
connect_timeout,
154-
first_byte_timeout,
155-
between_bytes_timeout,
156-
} = config;
157-
let span = tracing::Span::current();
160+
mut request: OutgoingRequest,
161+
mut config: OutgoingRequestConfig,
162+
) -> Result<IncomingResponse, HttpError> {
163+
self.prepare_request(&mut request, &mut config).await?;
164+
165+
// If the current span has opentelemetry trace context, inject it into the request
166+
spin_telemetry::inject_trace_context(&mut request);
158167

168+
// Run any configured request interceptor
169+
if let Some(interceptor) = &self.request_interceptor {
170+
let intercept_request = std::mem::take(&mut request).into();
171+
match interceptor.intercept(intercept_request).await? {
172+
InterceptOutcome::Continue(req) => {
173+
request = req.into_hyper_request();
174+
}
175+
InterceptOutcome::Complete(resp) => {
176+
let resp = IncomingResponse {
177+
resp,
178+
worker: None,
179+
between_bytes_timeout: config.between_bytes_timeout,
180+
};
181+
return Ok(resp);
182+
}
183+
}
184+
}
185+
186+
// Backfill span fields after potentially updating the URL in the interceptor
187+
if let Some(authority) = request.uri().authority() {
188+
let span = tracing::Span::current();
189+
span.record("server.address", authority.host());
190+
if let Some(port) = authority.port() {
191+
span.record("server.port", port.as_u16());
192+
}
193+
}
194+
195+
Ok(self.send_request(request, config).await?)
196+
}
197+
198+
async fn prepare_request(
199+
&self,
200+
request: &mut OutgoingRequest,
201+
config: &mut OutgoingRequestConfig,
202+
) -> Result<(), ErrorCode> {
159203
// wasmtime-wasi-http fills in scheme and authority for relative URLs
160204
// (e.g. https://:443/<path>), which makes them hard to reason about.
161205
// Undo that here.
@@ -170,50 +214,46 @@ impl RequestSender {
170214
}
171215
*uri = builder.build().unwrap();
172216
}
173-
span.record("url.full", uri.to_string());
174-
175-
// If the current span has opentelemetry trace context, inject it into the request
176-
spin_telemetry::inject_trace_context(&mut request);
217+
tracing::Span::current().record("url.full", uri.to_string());
177218

178-
let is_self_request = request
179-
.uri()
180-
.authority()
181-
.is_some_and(|a| a.host() == "self.alt");
219+
let is_self_request = match request.uri().authority() {
220+
// Some SDKs require an authority, so we support e.g. http://self.alt/self-request
221+
Some(authority) => authority.host() == "self.alt",
222+
// Otherwise self requests have no authority
223+
None => true,
224+
};
182225

183-
if request.uri().authority().is_some() && !is_self_request {
184-
// Absolute URI
185-
let is_allowed = self
186-
.allowed_hosts
187-
.check_url(&request.uri().to_string(), "https")
226+
// Enforce allowed_outbound_hosts
227+
let is_allowed = if is_self_request {
228+
self.allowed_hosts
229+
.check_relative_url(&["http", "https"])
188230
.await
189-
.unwrap_or(false);
190-
if !is_allowed {
191-
return Ok(Err(ErrorCode::HttpRequestDenied));
192-
}
231+
.unwrap_or(false)
193232
} else {
194-
// Relative URI ("self" request)
195-
let is_allowed = self
196-
.allowed_hosts
197-
.check_relative_url(&["http", "https"])
233+
self.allowed_hosts
234+
.check_url(&request.uri().to_string(), "https")
198235
.await
199-
.unwrap_or(false);
200-
if !is_allowed {
201-
return Ok(Err(ErrorCode::HttpRequestDenied));
202-
}
236+
.unwrap_or(false)
237+
};
238+
if !is_allowed {
239+
return Err(ErrorCode::HttpRequestDenied);
240+
}
203241

204-
let Some(origin) = self.self_request_origin else {
242+
if is_self_request {
243+
// Replace the authority with the "self request origin"
244+
let Some(origin) = self.self_request_origin.as_ref() else {
205245
tracing::error!(
206246
"Couldn't handle outbound HTTP request to relative URI; no origin set"
207247
);
208-
return Ok(Err(ErrorCode::HttpRequestUriInvalid));
248+
return Err(ErrorCode::HttpRequestUriInvalid);
209249
};
210250

211-
use_tls = origin.use_tls();
251+
config.use_tls = origin.use_tls();
212252

213253
request.headers_mut().insert(HOST, origin.host_header());
214254

215255
let path_and_query = request.uri().path_and_query().cloned();
216-
*request.uri_mut() = origin.into_uri(path_and_query);
256+
*request.uri_mut() = origin.clone().into_uri(path_and_query);
217257
}
218258

219259
// Some servers (looking at you nginx) don't like a host header even though
@@ -223,30 +263,20 @@ impl RequestSender {
223263
// decide to add the `host` header back in, regardless of the nginx bug, in
224264
// which case we'll let it do so without interferring.
225265
request.headers_mut().remove(HOST);
266+
Ok(())
267+
}
226268

227-
if let Some(interceptor) = self.request_interceptor {
228-
let intercept_request = std::mem::take(&mut request).into();
229-
match interceptor.intercept(intercept_request).await? {
230-
InterceptOutcome::Continue(req) => {
231-
request = req.into_hyper_request();
232-
}
233-
InterceptOutcome::Complete(resp) => {
234-
let resp = IncomingResponse {
235-
resp,
236-
worker: None,
237-
between_bytes_timeout: config.between_bytes_timeout,
238-
};
239-
return Ok(Ok(resp));
240-
}
241-
}
242-
}
243-
244-
// Backfill span fields after potentially updating the URL in the interceptor
245-
let authority = request.uri().authority().context("authority not set")?;
246-
span.record("server.address", authority.host());
247-
if let Some(port) = authority.port() {
248-
span.record("server.port", port.as_u16());
249-
}
269+
async fn send_request(
270+
self,
271+
request: OutgoingRequest,
272+
config: OutgoingRequestConfig,
273+
) -> Result<IncomingResponse, ErrorCode> {
274+
let OutgoingRequestConfig {
275+
use_tls,
276+
connect_timeout,
277+
first_byte_timeout,
278+
between_bytes_timeout,
279+
} = config;
250280

251281
let tls_client_config = if use_tls {
252282
let host = request.uri().host().unwrap_or_default();
@@ -288,11 +318,11 @@ impl RequestSender {
288318

289319
tracing::Span::current().record("http.response.status_code", resp.status().as_u16());
290320

291-
Ok(Ok(wasmtime_wasi_http::types::IncomingResponse {
321+
Ok(IncomingResponse {
292322
resp,
293323
worker: None,
294324
between_bytes_timeout,
295-
}))
325+
})
296326
}
297327
}
298328

0 commit comments

Comments
 (0)