Skip to content

Commit 622dbe9

Browse files
committed
Set a limit on max number of concurrent outbound http requests
Signed-off-by: Ryan Levick <[email protected]>
1 parent 44e1bef commit 622dbe9

File tree

5 files changed

+54
-19
lines changed

5 files changed

+54
-19
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use spin_factors::{
2222
anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
2323
SelfInstanceBuilder,
2424
};
25+
use tokio::sync::Semaphore;
2526
use wasmtime_wasi_http::WasiHttpCtx;
2627

2728
pub use wasmtime_wasi_http::{
@@ -51,13 +52,15 @@ impl Factor for OutboundHttpFactor {
5152
&self,
5253
mut ctx: ConfigureAppContext<T, Self>,
5354
) -> anyhow::Result<Self::AppState> {
54-
let connection_pooling = ctx
55-
.take_runtime_config()
56-
.unwrap_or_default()
57-
.connection_pooling;
55+
let config = ctx.take_runtime_config().unwrap_or_default();
5856
Ok(AppState {
59-
wasi_http_clients: wasi::HttpClients::new(connection_pooling),
60-
connection_pooling,
57+
wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
58+
connection_pooling_enabled: config.connection_pooling_enabled,
59+
concurrent_outbound_requests_semaphore: config
60+
.max_concurrent_requests
61+
// Permit count is the max concurrent requests + 1.
62+
// i.e., 0 concurrent requests means 1 total request.
63+
.map(|n| Arc::new(Semaphore::new(n + 1))),
6164
})
6265
}
6366

@@ -78,7 +81,11 @@ impl Factor for OutboundHttpFactor {
7881
request_interceptor: None,
7982
spin_http_client: None,
8083
wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
81-
connection_pooling: ctx.app_state().connection_pooling,
84+
connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
85+
concurrent_outbound_requests_semaphore: ctx
86+
.app_state()
87+
.concurrent_outbound_requests_semaphore
88+
.clone(),
8289
})
8390
}
8491
}
@@ -92,7 +99,7 @@ pub struct InstanceState {
9299
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
93100
// Connection-pooling client for 'fermyon:spin/http' interface
94101
//
95-
// TODO: We could move this to `AppState` to like the
102+
// TODO: We could move this to `AppState` like the
96103
// `wasi:http/outgoing-handler` pool for consistency, although it's probably
97104
// not a high priority given that `fermyon:spin/http` is deprecated anyway.
98105
spin_http_client: Option<reqwest::Client>,
@@ -101,7 +108,10 @@ pub struct InstanceState {
101108
// This is a clone of `AppState::wasi_http_clients`, meaning it is shared
102109
// among all instances of the app.
103110
wasi_http_clients: wasi::HttpClients,
104-
connection_pooling: bool,
111+
/// Whether connection pooling is enabled for this instance.
112+
connection_pooling_enabled: bool,
113+
/// A semaphore to limit the number of concurrent outbound requests.
114+
concurrent_outbound_requests_semaphore: Option<Arc<Semaphore>>,
105115
}
106116

107117
impl InstanceState {
@@ -183,5 +193,8 @@ impl std::fmt::Display for SelfRequestOrigin {
183193
pub struct AppState {
184194
// Connection pooling clients for `wasi:http/outgoing-handler` interface
185195
wasi_http_clients: wasi::HttpClients,
186-
connection_pooling: bool,
196+
/// Whether connection pooling is enabled for this app.
197+
connection_pooling_enabled: bool,
198+
/// A semaphore to limit the number of concurrent outbound requests.
199+
concurrent_outbound_requests_semaphore: Option<Arc<Semaphore>>,
187200
}

crates/factor-outbound-http/src/runtime_config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ pub mod spin;
55
#[derive(Debug)]
66
pub struct RuntimeConfig {
77
/// If true, enable connection pooling and reuse.
8-
pub connection_pooling: bool,
8+
pub connection_pooling_enabled: bool,
9+
/// If set, limits the number of concurrent outbound requests.
10+
pub max_concurrent_requests: Option<usize>,
911
}
1012

1113
impl Default for RuntimeConfig {
1214
fn default() -> Self {
1315
Self {
14-
connection_pooling: true,
16+
connection_pooling_enabled: true,
17+
max_concurrent_requests: None,
1518
}
1619
}
1720
}

crates/factor-outbound-http/src/runtime_config/spin.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ use spin_factors::runtime_config::toml::GetTomlValue;
66
/// Expects table to be in the format:
77
/// ```toml
88
/// [outbound_http]
9-
/// connection_pooling = true
9+
/// connection_pooling = true # optional, defaults to true
10+
/// max_concurrent_requests = 10 # optional, defaults to unlimited
1011
/// ```
1112
pub fn config_from_table(
1213
table: &impl GetTomlValue,
1314
) -> anyhow::Result<Option<super::RuntimeConfig>> {
1415
if let Some(outbound_http) = table.get("outbound_http") {
16+
let outbound_http_toml = outbound_http.clone().try_into::<OutboundHttpToml>()?;
1517
Ok(Some(super::RuntimeConfig {
16-
connection_pooling: outbound_http
17-
.clone()
18-
.try_into::<OutboundHttpToml>()?
19-
.connection_pooling,
18+
connection_pooling_enabled: outbound_http_toml.connection_pooling,
19+
max_concurrent_requests: outbound_http_toml.max_concurrent_requests,
2020
}))
2121
} else {
2222
Ok(None)
@@ -28,4 +28,6 @@ pub fn config_from_table(
2828
struct OutboundHttpToml {
2929
#[serde(default)]
3030
connection_pooling: bool,
31+
#[serde(default)]
32+
max_concurrent_requests: Option<usize>,
3133
}

crates/factor-outbound-http/src/spin.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ impl spin_http::Host for crate::InstanceState {
2121
if !req.params.is_empty() {
2222
tracing::warn!("HTTP params field is deprecated");
2323
}
24-
2524
let req_url = if !uri.starts_with('/') {
2625
// Absolute URI
2726
let is_allowed = self
@@ -92,13 +91,19 @@ impl spin_http::Host for crate::InstanceState {
9291
// in a single component execution
9392
let client = self.spin_http_client.get_or_insert_with(|| {
9493
let mut builder = reqwest::Client::builder();
95-
if !self.connection_pooling {
94+
if !self.connection_pooling_enabled {
9695
builder = builder.pool_max_idle_per_host(0);
9796
}
9897
builder.build().unwrap()
9998
});
10099

100+
// If we're limiting concurrent outbound requests, acquire a permit
101+
let permit = match &self.concurrent_outbound_requests_semaphore {
102+
Some(s) => s.acquire().await.ok(),
103+
None => None,
104+
};
101105
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
106+
drop(permit);
102107

103108
tracing::trace!("Returning response from outbound request to {req_url}");
104109
span.record("http.response.status_code", resp.status().as_u16());

crates/factor-outbound-http/src/wasi.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceSta
2626
use tokio::{
2727
io::{AsyncRead, AsyncWrite, ReadBuf},
2828
net::TcpStream,
29+
sync::Semaphore,
2930
time::timeout,
3031
};
3132
use tokio_rustls::client::TlsStream;
@@ -128,6 +129,10 @@ impl WasiHttpView for WasiHttpImplInner<'_> {
128129
self_request_origin: self.state.self_request_origin.clone(),
129130
blocked_networks: self.state.blocked_networks.clone(),
130131
http_clients: self.state.wasi_http_clients.clone(),
132+
concurrent_outbound_requests_semaphore: self
133+
.state
134+
.concurrent_outbound_requests_semaphore
135+
.clone(),
131136
};
132137
Ok(HostFutureIncomingResponse::Pending(
133138
wasmtime_wasi::runtime::spawn(
@@ -153,6 +158,7 @@ struct RequestSender {
153158
self_request_origin: Option<SelfRequestOrigin>,
154159
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
155160
http_clients: HttpClients,
161+
concurrent_outbound_requests_semaphore: Option<Arc<Semaphore>>,
156162
}
157163

158164
impl RequestSender {
@@ -320,11 +326,17 @@ impl RequestSender {
320326
},
321327
);
322328

329+
// If we're limiting concurrent outbound requests, acquire a permit
330+
let permit = match &self.concurrent_outbound_requests_semaphore {
331+
Some(s) => s.acquire().await.ok(),
332+
None => None,
333+
};
323334
let resp = timeout(first_byte_timeout, resp)
324335
.await
325336
.map_err(|_| ErrorCode::ConnectionReadTimeout)?
326337
.map_err(hyper_legacy_request_error)?
327338
.map(|body| body.map_err(hyper_request_error).boxed());
339+
drop(permit);
328340

329341
tracing::Span::current().record("http.response.status_code", resp.status().as_u16());
330342

0 commit comments

Comments
 (0)