Skip to content

Commit 25b234e

Browse files
committed
Dedup impl
Signed-off-by: Ryan Levick <[email protected]>
1 parent 37ceae2 commit 25b234e

File tree

3 files changed

+71
-50
lines changed

3 files changed

+71
-50
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,66 @@ impl InstanceState {
142142

143143
impl SelfInstanceBuilder for InstanceState {}
144144

145+
/// Helper module for acquiring permits from the outbound connections semaphore.
146+
///
147+
/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
148+
mod concurrent_outbound_connections {
149+
use super::*;
150+
151+
/// Acquires a semaphore permit for the given interface, if a semaphore is configured.
152+
pub async fn acquire_semaphore<'a>(
153+
interface: &str,
154+
semaphore: &'a Option<Arc<Semaphore>>,
155+
) -> Option<tokio::sync::SemaphorePermit<'a>> {
156+
let s = semaphore.as_ref()?;
157+
acquire(interface, || s.try_acquire(), async || s.acquire().await).await
158+
}
159+
160+
/// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
161+
pub async fn acquire_owned_semaphore(
162+
interface: &str,
163+
semaphore: &Option<Arc<Semaphore>>,
164+
) -> Option<tokio::sync::OwnedSemaphorePermit> {
165+
let s = semaphore.as_ref()?;
166+
acquire(
167+
interface,
168+
|| s.clone().try_acquire_owned(),
169+
async || s.clone().acquire_owned().await,
170+
)
171+
.await
172+
}
173+
174+
/// Helper function to acquire a semaphore permit, either immediately or by waiting.
175+
///
176+
/// Allows getting either a borrowed or owned permit.
177+
async fn acquire<T>(
178+
interface: &str,
179+
try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
180+
acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
181+
) -> Option<T> {
182+
// Try to acquire a permit without waiting first
183+
// Keep track of whether we had to wait for metrics purposes.
184+
let mut waited = false;
185+
let permit = match try_acquire() {
186+
Ok(p) => Ok(p),
187+
// No available permits right now; wait for one
188+
Err(tokio::sync::TryAcquireError::NoPermits) => {
189+
waited = true;
190+
acquire().await.map_err(|_| ())
191+
}
192+
Err(_) => Err(()),
193+
};
194+
if permit.is_ok() {
195+
spin_telemetry::monotonic_counter!(
196+
outbound_http.acquired_permits = 1,
197+
interface = interface,
198+
waited = waited
199+
);
200+
}
201+
permit.ok()
202+
}
203+
}
204+
145205
pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
146206
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
147207

crates/factor-outbound-http/src/spin.rs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,31 +100,11 @@ impl spin_http::Host for crate::InstanceState {
100100
// If we're limiting concurrent outbound requests, acquire a permit
101101
// Note: since we don't have access to the underlying connection, we can only
102102
// limit the number of concurrent requests, not connections.
103-
let permit = match &self.concurrent_outbound_connections_semaphore {
104-
Some(s) => {
105-
// Try to acquire a permit without waiting first
106-
// Keep track of whether we had to wait for metrics purposes.
107-
let mut waited = false;
108-
let permit = match s.try_acquire() {
109-
Ok(p) => Ok(p),
110-
// No available permits right now; wait for one
111-
Err(tokio::sync::TryAcquireError::NoPermits) => {
112-
waited = true;
113-
s.acquire().await.map_err(|_| ())
114-
}
115-
Err(_) => Err(()),
116-
};
117-
if permit.is_ok() {
118-
spin_telemetry::monotonic_counter!(
119-
outbound_http.acquired_permits = 1,
120-
interface = "spin",
121-
waited = waited
122-
);
123-
}
124-
permit.ok()
125-
}
126-
None => None,
127-
};
103+
let permit = crate::concurrent_outbound_connections::acquire_semaphore(
104+
"spin",
105+
&self.concurrent_outbound_connections_semaphore,
106+
)
107+
.await;
128108
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
129109
drop(permit);
130110

crates/factor-outbound-http/src/wasi.rs

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -598,31 +598,12 @@ impl ConnectOptions {
598598
}
599599

600600
// If we're limiting concurrent outbound requests, acquire a permit
601-
let permit = match &self.concurrent_outbound_connections_semaphore {
602-
Some(s) => {
603-
// Try to acquire a permit without waiting first
604-
// Keep track of whether we had to wait for metrics purposes.
605-
let mut waited = false;
606-
let permit = match s.clone().try_acquire_owned() {
607-
Ok(p) => Ok(p),
608-
// No available permits right now; wait for one
609-
Err(tokio::sync::TryAcquireError::NoPermits) => {
610-
waited = true;
611-
s.clone().acquire_owned().await.map_err(|_| ())
612-
}
613-
Err(_) => Err(()),
614-
};
615-
if permit.is_ok() {
616-
spin_telemetry::monotonic_counter!(
617-
outbound_http.acquired_permits = 1,
618-
interface = "wasi",
619-
waited = waited
620-
);
621-
}
622-
permit.ok()
623-
}
624-
None => None,
625-
};
601+
602+
let permit = crate::concurrent_outbound_connections::acquire_owned_semaphore(
603+
"wasi",
604+
&self.concurrent_outbound_connections_semaphore,
605+
)
606+
.await;
626607

627608
let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs))
628609
.await

0 commit comments

Comments
 (0)