Skip to content

Commit 351d49d

Browse files
authored
Merge pull request #2809 from lann/outbound-http-interceptor-async
Make OutboundHttpInterceptor async
2 parents 8ad00cc + fdcc7e7 commit 351d49d

File tree

3 files changed

+75
-85
lines changed

3 files changed

+75
-85
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod wasi;
33
pub mod wasi_2023_10_18;
44
pub mod wasi_2023_11_10;
55

6-
use std::net::SocketAddr;
6+
use std::{net::SocketAddr, sync::Arc};
77

88
use anyhow::Context;
99
use http::{
@@ -16,6 +16,7 @@ use spin_factor_outbound_networking::{
1616
use spin_factors::{
1717
anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1818
};
19+
use spin_world::async_trait;
1920
use wasmtime_wasi_http::WasiHttpCtx;
2021

2122
pub use wasmtime_wasi_http::{
@@ -91,7 +92,7 @@ pub struct InstanceState {
9192
allow_private_ips: bool,
9293
component_tls_configs: ComponentTlsConfigs,
9394
self_request_origin: Option<SelfRequestOrigin>,
94-
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
95+
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
9596
// Connection-pooling client for 'fermyon:spin/http' interface
9697
spin_http_client: Option<reqwest::Client>,
9798
}
@@ -115,14 +116,15 @@ impl InstanceState {
115116
if self.request_interceptor.is_some() {
116117
anyhow::bail!("set_request_interceptor can only be called once");
117118
}
118-
self.request_interceptor = Some(Box::new(interceptor));
119+
self.request_interceptor = Some(Arc::new(interceptor));
119120
Ok(())
120121
}
121122
}
122123

123124
impl SelfInstanceBuilder for InstanceState {}
124125

125126
pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
127+
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
126128

127129
/// SelfRequestOrigin indicates the base URI to use for "self" requests.
128130
///
@@ -177,29 +179,26 @@ impl std::fmt::Display for SelfRequestOrigin {
177179

178180
/// An outbound HTTP request interceptor to be used with
179181
/// [`InstanceState::set_request_interceptor`].
182+
#[async_trait]
180183
pub trait OutboundHttpInterceptor: Send + Sync {
181184
/// Intercept an outgoing HTTP request.
182185
///
183186
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
184-
/// updated) request and config will be passed on to the default outgoing
185-
/// request handler.
187+
/// updated) request will be passed on to the default outgoing request
188+
/// handler.
186189
///
187190
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
188191
/// result will be returned as the result of the request, bypassing the
189-
/// default handler.
190-
fn intercept(
191-
&self,
192-
request: &mut Request,
193-
config: &mut OutgoingRequestConfig,
194-
) -> InterceptOutcome;
192+
/// default handler. The `request` will also be dropped immediately.
193+
async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome>;
195194
}
196195

197196
/// The type returned by an [`OutboundHttpInterceptor`].
198197
pub enum InterceptOutcome {
199198
/// The intercepted request will be passed on to the default outgoing
200199
/// request handler.
201200
Continue,
202-
/// The given result will be returned as the result of the intercepted
201+
/// The given response will be returned as the result of the intercepted
203202
/// request, bypassing the default handler.
204-
Complete(HttpResult<HostFutureIncomingResponse>),
203+
Complete(Response),
205204
}

crates/factor-outbound-http/src/wasi.rs

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use http::{header::HOST, Request};
55
use http_body_util::BodyExt;
66
use ip_network::IpNetwork;
77
use rustls::ClientConfig;
8-
use spin_factor_outbound_networking::OutboundAllowedHosts;
8+
use spin_factor_outbound_networking::{ComponentTlsConfigs, OutboundAllowedHosts};
99
use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceState};
1010
use tokio::{net::TcpStream, time::timeout};
1111
use tracing::{field::Empty, instrument, Instrument};
@@ -19,7 +19,7 @@ use wasmtime_wasi_http::{
1919

2020
use crate::{
2121
wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor,
22-
SelfRequestOrigin,
22+
OutboundHttpInterceptor, SelfRequestOrigin,
2323
};
2424

2525
pub(crate) fn add_to_linker<T: Send + 'static>(
@@ -74,7 +74,7 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
7474
skip_all,
7575
fields(
7676
otel.kind = "client",
77-
url.full = %request.uri(),
77+
url.full = Empty,
7878
http.request.method = %request.method(),
7979
otel.name = %request.method(),
8080
http.response.status_code = Empty,
@@ -84,46 +84,18 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
8484
)]
8585
fn send_request(
8686
&mut self,
87-
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
88-
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
87+
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
88+
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
8989
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
90-
// wasmtime-wasi-http fills in scheme and authority for relative URLs
91-
// (e.g. https://:443/<path>), which makes them hard to reason about.
92-
// Undo that here.
93-
let uri = request.uri_mut();
94-
if uri
95-
.authority()
96-
.is_some_and(|authority| authority.host().is_empty())
97-
{
98-
let mut builder = http::uri::Builder::new();
99-
if let Some(paq) = uri.path_and_query() {
100-
builder = builder.path_and_query(paq.clone());
101-
}
102-
*uri = builder.build().unwrap();
103-
}
104-
105-
if let Some(interceptor) = &self.state.request_interceptor {
106-
match interceptor.intercept(&mut request, &mut config) {
107-
InterceptOutcome::Continue => (),
108-
InterceptOutcome::Complete(res) => return res,
109-
}
110-
}
111-
112-
let host = request.uri().host().unwrap_or_default();
113-
let tls_client_config = self
114-
.state
115-
.component_tls_configs
116-
.get_client_config(host)
117-
.clone();
118-
11990
Ok(HostFutureIncomingResponse::Pending(
12091
wasmtime_wasi::runtime::spawn(
12192
send_request_impl(
12293
request,
12394
config,
12495
self.state.allowed_hosts.clone(),
96+
self.state.component_tls_configs.clone(),
97+
self.state.request_interceptor.clone(),
12598
self.state.self_request_origin.clone(),
126-
tls_client_config,
12799
self.state.allow_private_ips,
128100
)
129101
.in_current_span(),
@@ -136,10 +108,47 @@ async fn send_request_impl(
136108
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
137109
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
138110
outbound_allowed_hosts: OutboundAllowedHosts,
111+
component_tls_configs: ComponentTlsConfigs,
112+
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
139113
self_request_origin: Option<SelfRequestOrigin>,
140-
tls_client_config: Arc<ClientConfig>,
141114
allow_private_ips: bool,
142115
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
116+
// wasmtime-wasi-http fills in scheme and authority for relative URLs
117+
// (e.g. https://:443/<path>), which makes them hard to reason about.
118+
// Undo that here.
119+
let uri = request.uri_mut();
120+
if uri
121+
.authority()
122+
.is_some_and(|authority| authority.host().is_empty())
123+
{
124+
let mut builder = http::uri::Builder::new();
125+
if let Some(paq) = uri.path_and_query() {
126+
builder = builder.path_and_query(paq.clone());
127+
}
128+
*uri = builder.build().unwrap();
129+
}
130+
let span = tracing::Span::current();
131+
span.record("url.full", uri.to_string());
132+
133+
spin_telemetry::inject_trace_context(&mut request);
134+
135+
if let Some(interceptor) = request_interceptor {
136+
match interceptor.intercept(&mut request).await? {
137+
InterceptOutcome::Continue => (),
138+
InterceptOutcome::Complete(resp) => {
139+
let resp = IncomingResponse {
140+
resp,
141+
worker: None,
142+
between_bytes_timeout: config.between_bytes_timeout,
143+
};
144+
return Ok(Ok(resp));
145+
}
146+
}
147+
}
148+
149+
let host = request.uri().host().unwrap_or_default();
150+
let tls_client_config = component_tls_configs.get_client_config(host).clone();
151+
143152
if request.uri().authority().is_some() {
144153
// Absolute URI
145154
let is_allowed = outbound_allowed_hosts
@@ -167,17 +176,15 @@ async fn send_request_impl(
167176
config.use_tls = origin.use_tls();
168177

169178
request.headers_mut().insert(HOST, origin.host_header());
170-
spin_telemetry::inject_trace_context(&mut request);
171179

172180
let path_and_query = request.uri().path_and_query().cloned();
173181
*request.uri_mut() = origin.into_uri(path_and_query);
174182
}
175183

176184
let authority = request.uri().authority().context("authority not set")?;
177-
let current_span = tracing::Span::current();
178-
current_span.record("server.address", authority.host());
185+
span.record("server.address", authority.host());
179186
if let Some(port) = authority.port() {
180-
current_span.record("server.port", port.as_u16());
187+
span.record("server.port", port.as_u16());
181188
}
182189

183190
Ok(send_request_handler(request, config, tls_client_config, allow_private_ips).await)
@@ -317,6 +324,8 @@ async fn send_request_handler(
317324
.map_err(hyper_request_error)?
318325
.map(|body| body.map_err(hyper_request_error).boxed());
319326

327+
tracing::Span::current().record("http.response.status_code", resp.status().as_u16());
328+
320329
Ok(wasmtime_wasi_http::types::IncomingResponse {
321330
resp,
322331
worker: Some(worker),

crates/trigger-http/src/outbound_http.rs

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@ use std::{
44
};
55

66
use http::uri::Scheme;
7-
use spin_factor_outbound_http::{
8-
HostFutureIncomingResponse, InterceptOutcome, OutgoingRequestConfig, Request,
9-
};
7+
use spin_core::async_trait;
8+
use spin_factor_outbound_http::{InterceptOutcome, Request};
109
use spin_factor_outbound_networking::parse_service_chaining_target;
1110
use spin_factors::RuntimeFactors;
1211
use spin_http::routes::RouteMatch;
13-
use wasmtime_wasi_http::types::IncomingResponse;
12+
use wasmtime_wasi_http::{HttpError, HttpResult};
1413

1514
use crate::HttpServer;
1615

@@ -27,40 +26,23 @@ impl<F: RuntimeFactors> OutboundHttpInterceptor<F> {
2726

2827
const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
2928

29+
#[async_trait]
3030
impl<F: RuntimeFactors> spin_factor_outbound_http::OutboundHttpInterceptor
3131
for OutboundHttpInterceptor<F>
3232
{
33-
fn intercept(
34-
&self,
35-
request: &mut Request,
36-
config: &mut OutgoingRequestConfig,
37-
) -> InterceptOutcome {
38-
let uri = request.uri();
39-
33+
async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome> {
4034
// Handle service chaining requests
41-
if let Some(component_id) = parse_service_chaining_target(uri) {
42-
// TODO: look at the rest of chain_request
43-
let route_match = RouteMatch::synthetic(&component_id, uri.path());
35+
if let Some(component_id) = parse_service_chaining_target(request.uri()) {
4436
let req = std::mem::take(request);
45-
let between_bytes_timeout = config.between_bytes_timeout;
46-
let server = self.server.clone();
47-
let resp_fut = async move {
48-
match server
49-
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
50-
.await
51-
{
52-
Ok(resp) => Ok(Ok(IncomingResponse {
53-
resp,
54-
between_bytes_timeout,
55-
worker: None,
56-
})),
57-
Err(e) => Err(wasmtime::Error::msg(e)),
58-
}
59-
};
60-
let resp = HostFutureIncomingResponse::pending(wasmtime_wasi::runtime::spawn(resp_fut));
61-
InterceptOutcome::Complete(Ok(resp))
37+
let route_match = RouteMatch::synthetic(&component_id, req.uri().path());
38+
let resp = self
39+
.server
40+
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
41+
.await
42+
.map_err(HttpError::trap)?;
43+
Ok(InterceptOutcome::Complete(resp))
6244
} else {
63-
InterceptOutcome::Continue
45+
Ok(InterceptOutcome::Continue)
6446
}
6547
}
6648
}

0 commit comments

Comments
 (0)