Skip to content

Commit 1b5567e

Browse files
committed
factor-outbound-http: Refactor wasi::send_request_impl
Signed-off-by: Lann Martin <[email protected]>
1 parent d1eb9ac commit 1b5567e

File tree

1 file changed

+153
-163
lines changed
  • crates/factor-outbound-http/src

1 file changed

+153
-163
lines changed

crates/factor-outbound-http/src/wasi.rs

Lines changed: 153 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -149,24 +149,29 @@ impl WasiHttpView for WasiHttpImplInner<'_> {
149149
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
150150
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
151151
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
152+
let request_sender = RequestSender {
153+
allowed_hosts: self.state.allowed_hosts.clone(),
154+
component_tls_configs: self.state.component_tls_configs.clone(),
155+
request_interceptor: self.state.request_interceptor.clone(),
156+
self_request_origin: self.state.self_request_origin.clone(),
157+
blocked_networks: self.state.blocked_networks.clone(),
158+
http_clients: self.state.wasi_http_clients.clone(),
159+
};
152160
Ok(HostFutureIncomingResponse::Pending(
153-
wasmtime_wasi::runtime::spawn(
154-
send_request_impl(
155-
request,
156-
config,
157-
self.state.allowed_hosts.clone(),
158-
self.state.component_tls_configs.clone(),
159-
self.state.request_interceptor.clone(),
160-
self.state.self_request_origin.clone(),
161-
self.state.blocked_networks.clone(),
162-
self.state.wasi_http_clients.clone(),
163-
)
164-
.in_current_span(),
165-
),
161+
wasmtime_wasi::runtime::spawn(request_sender.send(request, config).in_current_span()),
166162
))
167163
}
168164
}
169165

166+
struct RequestSender {
167+
allowed_hosts: OutboundAllowedHosts,
168+
blocked_networks: BlockedNetworks,
169+
component_tls_configs: ComponentTlsClientConfigs,
170+
self_request_origin: Option<SelfRequestOrigin>,
171+
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
172+
http_clients: HttpClients,
173+
}
174+
170175
#[derive(Clone)]
171176
struct ConnectOptions {
172177
blocked_networks: BlockedNetworks,
@@ -350,172 +355,157 @@ impl Service<Uri> for HttpsConnector {
350355
}
351356
}
352357

353-
#[allow(clippy::too_many_arguments)]
354-
async fn send_request_impl(
355-
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
356-
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
357-
outbound_allowed_hosts: OutboundAllowedHosts,
358-
component_tls_configs: ComponentTlsClientConfigs,
359-
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
360-
self_request_origin: Option<SelfRequestOrigin>,
361-
blocked_networks: BlockedNetworks,
362-
http_clients: HttpClients,
363-
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
364-
// wasmtime-wasi-http fills in scheme and authority for relative URLs
365-
// (e.g. https://:443/<path>), which makes them hard to reason about.
366-
// Undo that here.
367-
let uri = request.uri_mut();
368-
if uri
369-
.authority()
370-
.is_some_and(|authority| authority.host().is_empty())
371-
{
372-
let mut builder = http::uri::Builder::new();
373-
if let Some(paq) = uri.path_and_query() {
374-
builder = builder.path_and_query(paq.clone());
358+
impl RequestSender {
359+
async fn send(
360+
self,
361+
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
362+
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
363+
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
364+
let wasmtime_wasi_http::types::OutgoingRequestConfig {
365+
mut use_tls,
366+
connect_timeout,
367+
first_byte_timeout,
368+
between_bytes_timeout,
369+
} = config;
370+
371+
// wasmtime-wasi-http fills in scheme and authority for relative URLs
372+
// (e.g. https://:443/<path>), which makes them hard to reason about.
373+
// Undo that here.
374+
let uri = request.uri_mut();
375+
if uri
376+
.authority()
377+
.is_some_and(|authority| authority.host().is_empty())
378+
{
379+
let mut builder = http::uri::Builder::new();
380+
if let Some(paq) = uri.path_and_query() {
381+
builder = builder.path_and_query(paq.clone());
382+
}
383+
*uri = builder.build().unwrap();
375384
}
376-
*uri = builder.build().unwrap();
377-
}
378-
let span = tracing::Span::current();
379-
span.record("url.full", uri.to_string());
385+
let span = tracing::Span::current();
386+
span.record("url.full", uri.to_string());
387+
388+
spin_telemetry::inject_trace_context(&mut request);
389+
390+
let host = request.uri().host().unwrap_or_default();
391+
let tls_client_config = self.component_tls_configs.get_client_config(host).clone();
392+
393+
let is_self_request = request
394+
.uri()
395+
.authority()
396+
.is_some_and(|a| a.host() == "self.alt");
397+
398+
if request.uri().authority().is_some() && !is_self_request {
399+
// Absolute URI
400+
let is_allowed = self
401+
.allowed_hosts
402+
.check_url(&request.uri().to_string(), "https")
403+
.await
404+
.unwrap_or(false);
405+
if !is_allowed {
406+
return Ok(Err(ErrorCode::HttpRequestDenied));
407+
}
408+
} else {
409+
// Relative URI ("self" request)
410+
let is_allowed = self
411+
.allowed_hosts
412+
.check_relative_url(&["http", "https"])
413+
.await
414+
.unwrap_or(false);
415+
if !is_allowed {
416+
return Ok(Err(ErrorCode::HttpRequestDenied));
417+
}
380418

381-
spin_telemetry::inject_trace_context(&mut request);
419+
let Some(origin) = self.self_request_origin else {
420+
tracing::error!(
421+
"Couldn't handle outbound HTTP request to relative URI; no origin set"
422+
);
423+
return Ok(Err(ErrorCode::HttpRequestUriInvalid));
424+
};
382425

383-
let host = request.uri().host().unwrap_or_default();
384-
let tls_client_config = component_tls_configs.get_client_config(host).clone();
426+
use_tls = origin.use_tls();
385427

386-
let is_self_request = request
387-
.uri()
388-
.authority()
389-
.is_some_and(|a| a.host() == "self.alt");
428+
request.headers_mut().insert(HOST, origin.host_header());
390429

391-
if request.uri().authority().is_some() && !is_self_request {
392-
// Absolute URI
393-
let is_allowed = outbound_allowed_hosts
394-
.check_url(&request.uri().to_string(), "https")
395-
.await
396-
.unwrap_or(false);
397-
if !is_allowed {
398-
return Ok(Err(ErrorCode::HttpRequestDenied));
399-
}
400-
} else {
401-
// Relative URI ("self" request)
402-
let is_allowed = outbound_allowed_hosts
403-
.check_relative_url(&["http", "https"])
404-
.await
405-
.unwrap_or(false);
406-
if !is_allowed {
407-
return Ok(Err(ErrorCode::HttpRequestDenied));
430+
let path_and_query = request.uri().path_and_query().cloned();
431+
*request.uri_mut() = origin.into_uri(path_and_query);
408432
}
409433

410-
let Some(origin) = self_request_origin else {
411-
tracing::error!("Couldn't handle outbound HTTP request to relative URI; no origin set");
412-
return Ok(Err(ErrorCode::HttpRequestUriInvalid));
413-
};
414-
415-
config.use_tls = origin.use_tls();
416-
417-
request.headers_mut().insert(HOST, origin.host_header());
418-
419-
let path_and_query = request.uri().path_and_query().cloned();
420-
*request.uri_mut() = origin.into_uri(path_and_query);
421-
}
422-
423-
// Some servers (looking at you nginx) don't like a host header even though
424-
// http/2 allows it: https://github.com/hyperium/hyper/issues/3298.
425-
//
426-
// Note that we do this _before_ invoking the request interceptor. It may
427-
// decide to add the `host` header back in, regardless of the nginx bug, in
428-
// which case we'll let it do so without interferring.
429-
request.headers_mut().remove(HOST);
430-
431-
if let Some(interceptor) = request_interceptor {
432-
let intercept_request = std::mem::take(&mut request).into();
433-
match interceptor.intercept(intercept_request).await? {
434-
InterceptOutcome::Continue(req) => {
435-
request = req.into_hyper_request();
436-
}
437-
InterceptOutcome::Complete(resp) => {
438-
let resp = IncomingResponse {
439-
resp,
440-
worker: None,
441-
between_bytes_timeout: config.between_bytes_timeout,
442-
};
443-
return Ok(Ok(resp));
434+
// Some servers (looking at you nginx) don't like a host header even though
435+
// http/2 allows it: https://github.com/hyperium/hyper/issues/3298.
436+
//
437+
// Note that we do this _before_ invoking the request interceptor. It may
438+
// decide to add the `host` header back in, regardless of the nginx bug, in
439+
// which case we'll let it do so without interferring.
440+
request.headers_mut().remove(HOST);
441+
442+
if let Some(interceptor) = self.request_interceptor {
443+
let intercept_request = std::mem::take(&mut request).into();
444+
match interceptor.intercept(intercept_request).await? {
445+
InterceptOutcome::Continue(req) => {
446+
request = req.into_hyper_request();
447+
}
448+
InterceptOutcome::Complete(resp) => {
449+
let resp = IncomingResponse {
450+
resp,
451+
worker: None,
452+
between_bytes_timeout: config.between_bytes_timeout,
453+
};
454+
return Ok(Ok(resp));
455+
}
444456
}
445457
}
446-
}
447458

448-
let authority = request.uri().authority().context("authority not set")?;
449-
span.record("server.address", authority.host());
450-
if let Some(port) = authority.port() {
451-
span.record("server.port", port.as_u16());
452-
}
453-
454-
Ok(send_request_handler(
455-
request,
456-
config,
457-
tls_client_config,
458-
blocked_networks,
459-
http_clients,
460-
)
461-
.await)
462-
}
459+
let authority = request.uri().authority().context("authority not set")?;
460+
span.record("server.address", authority.host());
461+
if let Some(port) = authority.port() {
462+
span.record("server.port", port.as_u16());
463+
}
463464

464-
async fn send_request_handler(
465-
request: http::Request<HyperOutgoingBody>,
466-
wasmtime_wasi_http::types::OutgoingRequestConfig {
467-
use_tls,
468-
connect_timeout,
469-
first_byte_timeout,
470-
between_bytes_timeout,
471-
}: wasmtime_wasi_http::types::OutgoingRequestConfig,
472-
tls_client_config: TlsClientConfig,
473-
blocked_networks: BlockedNetworks,
474-
http_clients: HttpClients,
475-
) -> Result<wasmtime_wasi_http::types::IncomingResponse, ErrorCode> {
476-
let resp = CONNECT_OPTIONS.scope(
477-
ConnectOptions {
478-
blocked_networks,
479-
connect_timeout,
480-
},
481-
async move {
482-
if use_tls {
483-
TLS_CLIENT_CONFIG
484-
.scope(tls_client_config, async move {
485-
http_clients.https.request(request).await
486-
})
487-
.await
488-
} else {
489-
let use_http2 =
490-
std::env::var_os("SPIN_OUTBOUND_H2C_PRIOR_KNOWLEDGE").is_some_and(|v| {
491-
request
492-
.uri()
493-
.authority()
494-
.is_some_and(|authority| authority.as_str() == v)
495-
});
496-
497-
if use_http2 {
498-
http_clients.http2.request(request).await
465+
let resp = CONNECT_OPTIONS.scope(
466+
ConnectOptions {
467+
blocked_networks: self.blocked_networks,
468+
connect_timeout,
469+
},
470+
async move {
471+
if use_tls {
472+
TLS_CLIENT_CONFIG
473+
.scope(tls_client_config, async move {
474+
self.http_clients.https.request(request).await
475+
})
476+
.await
499477
} else {
500-
http_clients.http1.request(request).await
478+
let use_http2 = std::env::var_os("SPIN_OUTBOUND_H2C_PRIOR_KNOWLEDGE")
479+
.is_some_and(|v| {
480+
request
481+
.uri()
482+
.authority()
483+
.is_some_and(|authority| authority.as_str() == v)
484+
});
485+
486+
if use_http2 {
487+
self.http_clients.http2.request(request).await
488+
} else {
489+
self.http_clients.http1.request(request).await
490+
}
501491
}
502-
}
503-
},
504-
);
492+
},
493+
);
505494

506-
let resp = timeout(first_byte_timeout, resp)
507-
.await
508-
.map_err(|_| ErrorCode::ConnectionReadTimeout)?
509-
.map_err(hyper_legacy_request_error)?
510-
.map(|body| body.map_err(hyper_request_error).boxed());
495+
let resp = timeout(first_byte_timeout, resp)
496+
.await
497+
.map_err(|_| ErrorCode::ConnectionReadTimeout)?
498+
.map_err(hyper_legacy_request_error)?
499+
.map(|body| body.map_err(hyper_request_error).boxed());
511500

512-
tracing::Span::current().record("http.response.status_code", resp.status().as_u16());
501+
tracing::Span::current().record("http.response.status_code", resp.status().as_u16());
513502

514-
Ok(wasmtime_wasi_http::types::IncomingResponse {
515-
resp,
516-
worker: None,
517-
between_bytes_timeout,
518-
})
503+
Ok(Ok(wasmtime_wasi_http::types::IncomingResponse {
504+
resp,
505+
worker: None,
506+
between_bytes_timeout,
507+
}))
508+
}
519509
}
520510

521511
/// Translate a [`hyper::Error`] to a wasi-http `ErrorCode` in the context of a request.

0 commit comments

Comments
 (0)