Skip to content

Commit 2a18a64

Browse files
lannrylev
authored andcommitted
Make OutboundHttpInterceptor async
Signed-off-by: Lann Martin <[email protected]>
1 parent 8ad00cc commit 2a18a64

File tree

3 files changed

+62
-72
lines changed

3 files changed

+62
-72
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod wasi;
33
pub mod wasi_2023_10_18;
44
pub mod wasi_2023_11_10;
55

6-
use std::net::SocketAddr;
6+
use std::{net::SocketAddr, sync::Arc};
77

88
use anyhow::Context;
99
use http::{
@@ -16,7 +16,8 @@ use spin_factor_outbound_networking::{
1616
use spin_factors::{
1717
anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1818
};
19-
use wasmtime_wasi_http::WasiHttpCtx;
19+
use spin_world::async_trait;
20+
use wasmtime_wasi_http::{types::IncomingResponse, WasiHttpCtx};
2021

2122
pub use wasmtime_wasi_http::{
2223
body::HyperOutgoingBody,
@@ -91,7 +92,7 @@ pub struct InstanceState {
9192
allow_private_ips: bool,
9293
component_tls_configs: ComponentTlsConfigs,
9394
self_request_origin: Option<SelfRequestOrigin>,
94-
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
95+
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
9596
// Connection-pooling client for 'fermyon:spin/http' interface
9697
spin_http_client: Option<reqwest::Client>,
9798
}
@@ -115,7 +116,7 @@ impl InstanceState {
115116
if self.request_interceptor.is_some() {
116117
anyhow::bail!("set_request_interceptor can only be called once");
117118
}
118-
self.request_interceptor = Some(Box::new(interceptor));
119+
self.request_interceptor = Some(Arc::new(interceptor));
119120
Ok(())
120121
}
121122
}
@@ -177,6 +178,7 @@ impl std::fmt::Display for SelfRequestOrigin {
177178

178179
/// An outbound HTTP request interceptor to be used with
179180
/// [`InstanceState::set_request_interceptor`].
181+
#[async_trait]
180182
pub trait OutboundHttpInterceptor: Send + Sync {
181183
/// Intercept an outgoing HTTP request.
182184
///
@@ -186,12 +188,12 @@ pub trait OutboundHttpInterceptor: Send + Sync {
186188
///
187189
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
188190
/// result will be returned as the result of the request, bypassing the
189-
/// default handler.
190-
fn intercept(
191+
/// default handler. The `request` will also be dropped immediately.
192+
async fn intercept(
191193
&self,
192194
request: &mut Request,
193195
config: &mut OutgoingRequestConfig,
194-
) -> InterceptOutcome;
196+
) -> HttpResult<InterceptOutcome>;
195197
}
196198

197199
/// The type returned by an [`OutboundHttpInterceptor`].
@@ -201,5 +203,5 @@ pub enum InterceptOutcome {
201203
Continue,
202204
/// The given result will be returned as the result of the intercepted
203205
/// request, bypassing the default handler.
204-
Complete(HttpResult<HostFutureIncomingResponse>),
206+
Complete(IncomingResponse),
205207
}

crates/factor-outbound-http/src/wasi.rs

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use http::{header::HOST, Request};
55
use http_body_util::BodyExt;
66
use ip_network::IpNetwork;
77
use rustls::ClientConfig;
8-
use spin_factor_outbound_networking::OutboundAllowedHosts;
8+
use spin_factor_outbound_networking::{ComponentTlsConfigs, OutboundAllowedHosts};
99
use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceState};
1010
use tokio::{net::TcpStream, time::timeout};
1111
use tracing::{field::Empty, instrument, Instrument};
@@ -19,7 +19,7 @@ use wasmtime_wasi_http::{
1919

2020
use crate::{
2121
wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor,
22-
SelfRequestOrigin,
22+
OutboundHttpInterceptor, SelfRequestOrigin,
2323
};
2424

2525
pub(crate) fn add_to_linker<T: Send + 'static>(
@@ -84,46 +84,18 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
8484
)]
8585
fn send_request(
8686
&mut self,
87-
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
88-
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
87+
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
88+
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
8989
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
90-
// wasmtime-wasi-http fills in scheme and authority for relative URLs
91-
// (e.g. https://:443/<path>), which makes them hard to reason about.
92-
// Undo that here.
93-
let uri = request.uri_mut();
94-
if uri
95-
.authority()
96-
.is_some_and(|authority| authority.host().is_empty())
97-
{
98-
let mut builder = http::uri::Builder::new();
99-
if let Some(paq) = uri.path_and_query() {
100-
builder = builder.path_and_query(paq.clone());
101-
}
102-
*uri = builder.build().unwrap();
103-
}
104-
105-
if let Some(interceptor) = &self.state.request_interceptor {
106-
match interceptor.intercept(&mut request, &mut config) {
107-
InterceptOutcome::Continue => (),
108-
InterceptOutcome::Complete(res) => return res,
109-
}
110-
}
111-
112-
let host = request.uri().host().unwrap_or_default();
113-
let tls_client_config = self
114-
.state
115-
.component_tls_configs
116-
.get_client_config(host)
117-
.clone();
118-
11990
Ok(HostFutureIncomingResponse::Pending(
12091
wasmtime_wasi::runtime::spawn(
12192
send_request_impl(
12293
request,
12394
config,
12495
self.state.allowed_hosts.clone(),
96+
self.state.component_tls_configs.clone(),
97+
self.state.request_interceptor.clone(),
12598
self.state.self_request_origin.clone(),
126-
tls_client_config,
12799
self.state.allow_private_ips,
128100
)
129101
.in_current_span(),
@@ -136,10 +108,36 @@ async fn send_request_impl(
136108
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
137109
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
138110
outbound_allowed_hosts: OutboundAllowedHosts,
111+
component_tls_configs: ComponentTlsConfigs,
112+
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
139113
self_request_origin: Option<SelfRequestOrigin>,
140-
tls_client_config: Arc<ClientConfig>,
141114
allow_private_ips: bool,
142115
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
116+
// wasmtime-wasi-http fills in scheme and authority for relative URLs
117+
// (e.g. https://:443/<path>), which makes them hard to reason about.
118+
// Undo that here.
119+
let uri = request.uri_mut();
120+
if uri
121+
.authority()
122+
.is_some_and(|authority| authority.host().is_empty())
123+
{
124+
let mut builder = http::uri::Builder::new();
125+
if let Some(paq) = uri.path_and_query() {
126+
builder = builder.path_and_query(paq.clone());
127+
}
128+
*uri = builder.build().unwrap();
129+
}
130+
131+
if let Some(interceptor) = request_interceptor {
132+
match interceptor.intercept(&mut request, &mut config).await? {
133+
InterceptOutcome::Continue => (),
134+
InterceptOutcome::Complete(resp) => return Ok(Ok(resp)),
135+
}
136+
}
137+
138+
let host = request.uri().host().unwrap_or_default();
139+
let tls_client_config = component_tls_configs.get_client_config(host).clone();
140+
143141
if request.uri().authority().is_some() {
144142
// Absolute URI
145143
let is_allowed = outbound_allowed_hosts

crates/trigger-http/src/outbound_http.rs

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@ use std::{
44
};
55

66
use http::uri::Scheme;
7-
use spin_factor_outbound_http::{
8-
HostFutureIncomingResponse, InterceptOutcome, OutgoingRequestConfig, Request,
9-
};
7+
use spin_core::async_trait;
8+
use spin_factor_outbound_http::{InterceptOutcome, OutgoingRequestConfig, Request};
109
use spin_factor_outbound_networking::parse_service_chaining_target;
1110
use spin_factors::RuntimeFactors;
1211
use spin_http::routes::RouteMatch;
13-
use wasmtime_wasi_http::types::IncomingResponse;
12+
use wasmtime_wasi_http::{types::IncomingResponse, HttpError, HttpResult};
1413

1514
use crate::HttpServer;
1615

@@ -27,40 +26,31 @@ impl<F: RuntimeFactors> OutboundHttpInterceptor<F> {
2726

2827
const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
2928

29+
#[async_trait]
3030
impl<F: RuntimeFactors> spin_factor_outbound_http::OutboundHttpInterceptor
3131
for OutboundHttpInterceptor<F>
3232
{
33-
fn intercept(
33+
async fn intercept(
3434
&self,
3535
request: &mut Request,
3636
config: &mut OutgoingRequestConfig,
37-
) -> InterceptOutcome {
38-
let uri = request.uri();
39-
37+
) -> HttpResult<InterceptOutcome> {
4038
// Handle service chaining requests
41-
if let Some(component_id) = parse_service_chaining_target(uri) {
42-
// TODO: look at the rest of chain_request
43-
let route_match = RouteMatch::synthetic(&component_id, uri.path());
39+
if let Some(component_id) = parse_service_chaining_target(request.uri()) {
4440
let req = std::mem::take(request);
45-
let between_bytes_timeout = config.between_bytes_timeout;
46-
let server = self.server.clone();
47-
let resp_fut = async move {
48-
match server
49-
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
50-
.await
51-
{
52-
Ok(resp) => Ok(Ok(IncomingResponse {
53-
resp,
54-
between_bytes_timeout,
55-
worker: None,
56-
})),
57-
Err(e) => Err(wasmtime::Error::msg(e)),
58-
}
59-
};
60-
let resp = HostFutureIncomingResponse::pending(wasmtime_wasi::runtime::spawn(resp_fut));
61-
InterceptOutcome::Complete(Ok(resp))
41+
let route_match = RouteMatch::synthetic(&component_id, req.uri().path());
42+
let resp = self
43+
.server
44+
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
45+
.await
46+
.map_err(HttpError::trap)?;
47+
Ok(InterceptOutcome::Complete(IncomingResponse {
48+
resp,
49+
worker: None,
50+
between_bytes_timeout: config.between_bytes_timeout,
51+
}))
6252
} else {
63-
InterceptOutcome::Continue
53+
Ok(InterceptOutcome::Continue)
6454
}
6555
}
6656
}

0 commit comments

Comments
 (0)