Skip to content

Commit a472018

Browse files
committed
Acquire permit when doing TCP connection
Signed-off-by: Ryan Levick <[email protected]>
1 parent 68e5471 commit a472018

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

crates/factor-outbound-http/src/spin.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ impl spin_http::Host for crate::InstanceState {
9898
});
9999

100100
// If we're limiting concurrent outbound requests, acquire a permit
101+
// Note: since we don't have access to the underlying connection, we can only
102+
// limit the number of concurrent requests, not connections.
101103
let permit = match &self.concurrent_outbound_connections_semaphore {
102104
Some(s) => s.acquire().await.ok(),
103105
None => None,

crates/factor-outbound-http/src/wasi.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -457,18 +457,14 @@ impl RequestSender {
457457
None
458458
};
459459

460-
// If we're limiting concurrent outbound requests, acquire a permit
461-
let permit = match self.concurrent_outbound_connections_semaphore {
462-
Some(s) => s.acquire_owned().await.ok().map(Arc::new),
463-
None => None,
464-
};
465460
let resp = CONNECT_OPTIONS.scope(
466461
ConnectOptions {
467462
blocked_networks: self.blocked_networks,
468463
connect_timeout,
469464
tls_client_config,
470465
override_connect_addr,
471-
permit,
466+
concurrent_outbound_connections_semaphore: self
467+
.concurrent_outbound_connections_semaphore,
472468
},
473469
async move {
474470
if use_tls {
@@ -556,10 +552,8 @@ struct ConnectOptions {
556552
tls_client_config: Option<TlsClientConfig>,
557553
/// If set, override the address to connect to instead of using the given `uri`'s authority.
558554
override_connect_addr: Option<SocketAddr>,
559-
/// A permit for this connection
560-
///
561-
/// If there is a permit, it should be dropped when the connection is closed.
562-
permit: Option<Arc<OwnedSemaphorePermit>>,
555+
/// A semaphore to limit the number of concurrent outbound connections.
556+
concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
563557
}
564558

565559
impl ConnectOptions {
@@ -603,6 +597,12 @@ impl ConnectOptions {
603597
return Err(ErrorCode::DestinationIpProhibited);
604598
}
605599

600+
// If we're limiting concurrent outbound requests, acquire a permit
601+
let permit = match &self.concurrent_outbound_connections_semaphore {
602+
Some(s) => s.clone().acquire_owned().await.ok(),
603+
None => None,
604+
};
605+
606606
let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs))
607607
.await
608608
.map_err(|_| ErrorCode::ConnectionTimeout)?
@@ -614,7 +614,7 @@ impl ConnectOptions {
614614
})?;
615615
Ok(PermittedTcpStream {
616616
inner: stream,
617-
_permit: self.permit.clone(),
617+
_permit: permit,
618618
})
619619
}
620620

@@ -757,7 +757,7 @@ struct PermittedTcpStream {
757757
///
758758
/// When this stream is dropped, the permit is also dropped, allowing another
759759
/// connection to be established.
760-
_permit: Option<Arc<OwnedSemaphorePermit>>,
760+
_permit: Option<OwnedSemaphorePermit>,
761761
}
762762

763763
impl Connection for PermittedTcpStream {

0 commit comments

Comments
 (0)