Skip to content

Commit 0f8d9b8

Browse files
authored
Fix port races (#91)
1 parent f2f3804 commit 0f8d9b8

File tree

2 files changed

+448
-37
lines changed

2 files changed

+448
-37
lines changed

runtime/proxy/src/lib.rs

Lines changed: 259 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,64 @@ pub struct PreparedProxy {
6767
proxy_identity: MeshIdentity,
6868
mesh_addr: Option<String>,
6969
mesh_listen: SocketAddr,
70+
reserved_listeners: ReservedProxyListeners,
71+
}
72+
73+
#[derive(Debug)]
74+
struct ReservedTcpListener {
75+
listener: TcpListener,
76+
}
77+
78+
#[derive(Debug, Default)]
79+
struct ReservedProxyListeners {
80+
mesh: Option<ReservedTcpListener>,
81+
outbound: Vec<ReservedTcpListener>,
82+
}
83+
84+
impl ReservedTcpListener {
85+
fn reserve(listen: SocketAddr) -> Result<Self> {
86+
let listener = TcpListener::bind(listen)
87+
.into_diagnostic()
88+
.wrap_err_with(|| format!("failed to reserve listener at {listen}"))?;
89+
Ok(Self { listener })
90+
}
91+
92+
fn local_addr(&self) -> Result<SocketAddr> {
93+
self.listener.local_addr().into_diagnostic()
94+
}
95+
96+
fn into_tokio(self) -> Result<tokio::net::TcpListener> {
97+
self.listener
98+
.set_nonblocking(true)
99+
.into_diagnostic()
100+
.wrap_err("failed to configure reserved listener as non-blocking")?;
101+
tokio::net::TcpListener::from_std(self.listener)
102+
.into_diagnostic()
103+
.wrap_err("failed to adopt reserved listener")
104+
}
105+
}
106+
107+
impl ReservedProxyListeners {
108+
fn into_router_prebound(
109+
self,
110+
outbound_routes: &[OutboundRoute],
111+
) -> Result<router::PreboundListeners> {
112+
let mut listeners = router::PreboundListeners::default();
113+
if let Some(mesh) = self.mesh {
114+
listeners = listeners.with_mesh(mesh.into_tokio()?);
115+
}
116+
if self.outbound.len() != outbound_routes.len() {
117+
return Err(miette::miette!(
118+
"reserved {} outbound listeners for {} outbound routes",
119+
self.outbound.len(),
120+
outbound_routes.len()
121+
));
122+
}
123+
for (route, listener) in outbound_routes.iter().zip(self.outbound) {
124+
listeners.insert_outbound(route.route_id.clone(), listener.into_tokio()?);
125+
}
126+
Ok(listeners)
127+
}
70128
}
71129

72130
impl ProxyCommand {
@@ -188,24 +246,28 @@ impl ProxyCommand {
188246
.await?;
189247
let proxy_identity = build_proxy_identity("/proxy", &router_identity);
190248

191-
let (mesh_addr, mesh_listen) = if self.slot_bindings.is_empty() {
192-
(None, SocketAddr::from(([127, 0, 0, 1], 0)))
249+
let (mesh_addr, mesh_listen, mesh_listener) = if self.slot_bindings.is_empty() {
250+
(None, SocketAddr::from(([127, 0, 0, 1], 0)), None)
193251
} else {
194-
let (mesh_addr, mesh_listen) =
195-
resolve_mesh_addresses(self.mesh_addr.as_deref(), &target)?;
196-
(Some(mesh_addr), mesh_listen)
252+
let (mesh_addr, mesh_listen, listener) =
253+
reserve_mesh_addresses(self.mesh_addr.as_deref(), &target)?;
254+
(Some(mesh_addr), mesh_listen, Some(listener))
197255
};
256+
let (export_bindings, mut reserved_listeners) =
257+
reserve_export_bindings(&self.export_bindings)?;
258+
reserved_listeners.mesh = mesh_listener;
198259

199260
Ok(PreparedProxy {
200261
target,
201262
slot_bindings: self.slot_bindings,
202-
export_bindings: self.export_bindings,
263+
export_bindings,
203264
control_endpoint,
204265
router_identity,
205266
router_addr,
206267
proxy_identity,
207268
mesh_addr,
208269
mesh_listen,
270+
reserved_listeners,
209271
})
210272
}
211273

@@ -354,8 +416,11 @@ impl PreparedProxy {
354416
outbound,
355417
transport: TransportConfig::NoiseIk {},
356418
};
419+
let prebound_listeners = self
420+
.reserved_listeners
421+
.into_router_prebound(&config.outbound)?;
357422

358-
let mut router = std::pin::pin!(router::run(config));
423+
let mut router = std::pin::pin!(router::run_with_listeners(config, prebound_listeners));
359424
let mut shutdown = std::pin::pin!(wait_for_shutdown_signal());
360425
tokio::select! {
361426
res = &mut router => res.map_err(|err| miette::miette!("proxy failed: {err}")),
@@ -777,27 +842,34 @@ fn validate_proxy_metadata(metadata: &ProxyMetadata, path: &Path) -> Result<()>
777842
Ok(())
778843
}
779844

780-
fn resolve_mesh_addresses(
845+
fn reserve_mesh_addresses(
781846
mesh_addr_override: Option<&str>,
782847
target: &ProxyTarget,
783-
) -> Result<(String, SocketAddr)> {
848+
) -> Result<(String, SocketAddr, ReservedTcpListener)> {
784849
if let Some(mesh_addr) = mesh_addr_override {
785850
let port = parse_mesh_addr_port(mesh_addr)?;
786851
let listen_ip = match target.kind {
787852
ProxyTargetKind::Direct | ProxyTargetKind::Vm => Ipv4Addr::LOCALHOST,
788853
_ => Ipv4Addr::UNSPECIFIED,
789854
};
790-
let listen = SocketAddr::new(IpAddr::V4(listen_ip), port);
791-
return Ok((mesh_addr.to_string(), listen));
855+
let listener = ReservedTcpListener::reserve(SocketAddr::new(IpAddr::V4(listen_ip), port))?;
856+
let actual_listen = listener.local_addr()?;
857+
let actual_mesh_addr = if port == 0 {
858+
rewrite_addr_port(mesh_addr, actual_listen.port())?
859+
} else {
860+
mesh_addr.to_string()
861+
};
862+
return Ok((actual_mesh_addr, actual_listen, listener));
792863
}
793864

794-
let port = pick_free_port()?;
795-
let mesh_addr = default_mesh_addr(target, port)?;
796865
let listen_ip = match target.kind {
797866
ProxyTargetKind::Direct | ProxyTargetKind::Vm => Ipv4Addr::LOCALHOST,
798867
_ => Ipv4Addr::UNSPECIFIED,
799868
};
800-
Ok((mesh_addr, SocketAddr::new(IpAddr::V4(listen_ip), port)))
869+
let listener = ReservedTcpListener::reserve(SocketAddr::new(IpAddr::V4(listen_ip), 0))?;
870+
let actual_listen = listener.local_addr()?;
871+
let mesh_addr = default_mesh_addr(target, actual_listen.port())?;
872+
Ok((mesh_addr, actual_listen, listener))
801873
}
802874

803875
fn default_mesh_addr(target: &ProxyTarget, port: u16) -> Result<String> {
@@ -817,10 +889,29 @@ fn parse_mesh_addr_port(addr: &str) -> Result<u16> {
817889
.ok_or_else(|| miette::miette!("--mesh-addr must include a port (got {addr})"))
818890
}
819891

820-
fn pick_free_port() -> Result<u16> {
821-
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0))
822-
.into_diagnostic()?;
823-
Ok(listener.local_addr().into_diagnostic()?.port())
892+
fn reserve_export_bindings(
893+
export_bindings: &[ExportBinding],
894+
) -> Result<(Vec<ExportBinding>, ReservedProxyListeners)> {
895+
let mut actual_bindings = Vec::with_capacity(export_bindings.len());
896+
let mut listeners = ReservedProxyListeners::default();
897+
for binding in export_bindings {
898+
let listener = ReservedTcpListener::reserve(binding.listen)?;
899+
let actual_listen = listener.local_addr()?;
900+
listeners.outbound.push(listener);
901+
actual_bindings.push(ExportBinding {
902+
export: binding.export.clone(),
903+
listen: actual_listen,
904+
});
905+
}
906+
Ok((actual_bindings, listeners))
907+
}
908+
909+
fn rewrite_addr_port(addr: &str, port: u16) -> Result<String> {
910+
let mut url = Url::parse(&format!("mesh://{addr}"))
911+
.map_err(|err| miette::miette!("invalid --mesh-addr {addr}: {err}"))?;
912+
url.set_port(Some(port))
913+
.map_err(|_| miette::miette!("failed to rewrite --mesh-addr {addr} with port {port}"))?;
914+
Ok(url[url::Position::BeforeHost..].to_string())
824915
}
825916

826917
async fn resolve_router_mesh_addr(
@@ -1918,6 +2009,156 @@ mod tests {
19182009
assert!(err.to_string().contains("at least one --slot"), "{err}");
19192010
}
19202011

2012+
fn test_proxy_target(kind: ProxyTargetKind) -> ProxyTarget {
2013+
ProxyTarget {
2014+
kind,
2015+
metadata: ProxyMetadata {
2016+
version: PROXY_METADATA_VERSION.to_string(),
2017+
..Default::default()
2018+
},
2019+
source: PathBuf::from("/tmp/out"),
2020+
}
2021+
}
2022+
2023+
#[test]
2024+
fn reserve_mesh_addresses_for_compose_keeps_listener_public() {
2025+
let target = test_proxy_target(ProxyTargetKind::DockerCompose);
2026+
2027+
let (mesh_addr, listen, listener) =
2028+
reserve_mesh_addresses(None, &target).expect("mesh listener should reserve");
2029+
2030+
assert_eq!(mesh_addr, format!("host.docker.internal:{}", listen.port()));
2031+
assert!(
2032+
listen.ip().is_unspecified(),
2033+
"compose proxy mesh listener must stay reachable from containers"
2034+
);
2035+
assert_eq!(
2036+
listener
2037+
.local_addr()
2038+
.expect("reserved listener should report its address"),
2039+
listen
2040+
);
2041+
}
2042+
2043+
#[test]
2044+
fn reserve_mesh_addresses_for_direct_keeps_listener_loopback() {
2045+
let target = test_proxy_target(ProxyTargetKind::Direct);
2046+
2047+
let (mesh_addr, listen, listener) =
2048+
reserve_mesh_addresses(None, &target).expect("mesh listener should reserve");
2049+
2050+
assert_eq!(mesh_addr, format!("127.0.0.1:{}", listen.port()));
2051+
assert!(
2052+
listen.ip().is_loopback(),
2053+
"direct proxy mesh listener should stay local to the host"
2054+
);
2055+
assert_eq!(
2056+
listener
2057+
.local_addr()
2058+
.expect("reserved listener should report its address"),
2059+
listen
2060+
);
2061+
}
2062+
2063+
#[test]
2064+
fn reserve_mesh_addresses_rewrites_ephemeral_override_port() {
2065+
let target = test_proxy_target(ProxyTargetKind::Direct);
2066+
2067+
let (mesh_addr, listen, _listener) = reserve_mesh_addresses(Some("127.0.0.1:0"), &target)
2068+
.expect("mesh listener should reserve");
2069+
2070+
assert_eq!(mesh_addr, format!("127.0.0.1:{}", listen.port()));
2071+
assert_ne!(listen.port(), 0);
2072+
}
2073+
2074+
#[test]
2075+
fn reserve_export_bindings_hold_reserved_ports_until_drop() {
2076+
let requested = [ExportBinding {
2077+
export: "api".to_string(),
2078+
listen: SocketAddr::from(([127, 0, 0, 1], 0)),
2079+
}];
2080+
2081+
let (bindings, listeners) =
2082+
reserve_export_bindings(&requested).expect("export listeners should reserve");
2083+
let actual = bindings
2084+
.first()
2085+
.expect("reserved export should exist")
2086+
.listen;
2087+
2088+
assert_ne!(actual.port(), 0);
2089+
assert!(
2090+
TcpListener::bind(actual).is_err(),
2091+
"reserved export port should stay occupied until the reservation is dropped"
2092+
);
2093+
2094+
drop(listeners);
2095+
TcpListener::bind(actual).expect("dropping the reservation should release the port");
2096+
}
2097+
2098+
#[tokio::test]
2099+
async fn reserve_export_bindings_preserve_duplicate_export_listeners() {
2100+
let requested = vec![
2101+
ExportBinding {
2102+
export: "api".to_string(),
2103+
listen: SocketAddr::from(([127, 0, 0, 1], 0)),
2104+
},
2105+
ExportBinding {
2106+
export: "api".to_string(),
2107+
listen: SocketAddr::from(([127, 0, 0, 1], 0)),
2108+
},
2109+
];
2110+
2111+
let (bindings, listeners) =
2112+
reserve_export_bindings(&requested).expect("export listeners should reserve");
2113+
let routes = vec![
2114+
OutboundRoute {
2115+
route_id: "duplicate-route".to_string(),
2116+
slot: "api".to_string(),
2117+
capability_kind: None,
2118+
capability_profile: None,
2119+
listen_port: bindings[0].listen.port(),
2120+
listen_addr: Some(bindings[0].listen.ip().to_string()),
2121+
protocol: MeshProtocol::Http,
2122+
http_plugins: Vec::new(),
2123+
peer_addr: "127.0.0.1:1".to_string(),
2124+
peer_id: "/router".to_string(),
2125+
capability: "api".to_string(),
2126+
},
2127+
OutboundRoute {
2128+
route_id: "duplicate-route".to_string(),
2129+
slot: "api".to_string(),
2130+
capability_kind: None,
2131+
capability_profile: None,
2132+
listen_port: bindings[1].listen.port(),
2133+
listen_addr: Some(bindings[1].listen.ip().to_string()),
2134+
protocol: MeshProtocol::Http,
2135+
http_plugins: Vec::new(),
2136+
peer_addr: "127.0.0.1:1".to_string(),
2137+
peer_id: "/router".to_string(),
2138+
capability: "api".to_string(),
2139+
},
2140+
];
2141+
2142+
let listeners = listeners
2143+
.into_router_prebound(&routes)
2144+
.expect("duplicate export listeners should convert");
2145+
2146+
assert!(
2147+
TcpListener::bind(bindings[0].listen).is_err(),
2148+
"the first duplicate listener should still be reserved after conversion"
2149+
);
2150+
assert!(
2151+
TcpListener::bind(bindings[1].listen).is_err(),
2152+
"the second duplicate listener should still be reserved after conversion"
2153+
);
2154+
2155+
drop(listeners);
2156+
TcpListener::bind(bindings[0].listen)
2157+
.expect("dropping the converted listeners should release the first port");
2158+
TcpListener::bind(bindings[1].listen)
2159+
.expect("dropping the converted listeners should release the second port");
2160+
}
2161+
19212162
#[test]
19222163
fn parse_matching_compose_project_names_filters_to_target_compose_file() {
19232164
let projects = parse_matching_compose_project_names(

0 commit comments

Comments
 (0)