Skip to content

Commit f5ebc53

Browse files
committed
fix: update port manager to use multiprocessing lock for process safety
1 parent dd7b2f3 commit f5ebc53

File tree

3 files changed

+7
-18
lines changed

3 files changed

+7
-18
lines changed

src/lightning/fabric/utilities/port_manager.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
from collections import deque
2121
from collections.abc import Iterator
2222
from contextlib import contextmanager
23+
from multiprocessing import Lock
2324
from typing import Optional
2425

26+
lock = Lock()
2527
log = logging.getLogger(__name__)
2628

2729
# Size of the recently released ports queue
@@ -31,7 +33,7 @@
3133

3234

3335
class PortManager:
34-
"""Thread-safe port manager to prevent EADDRINUSE errors.
36+
"""Process-safe port manager to prevent EADDRINUSE errors.
3537
3638
This manager maintains a global registry of allocated ports to ensure that multiple concurrent tests don't try to
3739
use the same port. While this doesn't completely eliminate the race condition with external processes, it prevents
@@ -40,7 +42,6 @@ class PortManager:
4042
"""
4143

4244
def __init__(self) -> None:
43-
self._lock = threading.Lock()
4445
self._allocated_ports: set[int] = set()
4546
# Recently released ports are kept in a queue to avoid immediate reuse
4647
self._recently_released: deque[int] = deque(maxlen=_RECENTLY_RELEASED_PORTS_MAXLEN)
@@ -61,7 +62,7 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
6162
RuntimeError: If unable to find a free port after max_attempts
6263
6364
"""
64-
with self._lock:
65+
with lock:
6566
# If a preferred port is specified and available, use it
6667
if (
6768
preferred_port is not None
@@ -113,15 +114,15 @@ def release_port(self, port: int) -> None:
113114
port: Port number to release
114115
115116
"""
116-
with self._lock:
117+
with lock:
117118
if port in self._allocated_ports:
118119
self._allocated_ports.remove(port)
119120
# Add to the back of the queue; oldest will be evicted when queue is full
120121
self._recently_released.append(port)
121122

122123
def release_all(self) -> None:
123124
"""Release all allocated ports."""
124-
with self._lock:
125+
with lock:
125126
self._allocated_ports.clear()
126127
self._recently_released.clear()
127128

@@ -138,7 +139,7 @@ def reserve_existing_port(self, port: int) -> bool:
138139
if port <= 0 or port > 65535:
139140
return False
140141

141-
with self._lock:
142+
with lock:
142143
if port in self._allocated_ports:
143144
return True
144145

tests/tests_fabric/conftest.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
255255
yellow=True,
256256
)
257257

258-
# Clear the port manager's state to get fresh ports
259-
from lightning.fabric.utilities.port_manager import get_port_manager
260-
261-
manager = get_port_manager()
262-
manager.release_all()
263-
264258
# Clear MASTER_PORT so cluster environment allocates a fresh port on retry
265259
import os
266260

tests/tests_pytorch/conftest.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -383,12 +383,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
383383
yellow=True,
384384
)
385385

386-
# Clear the port manager's state to get fresh ports
387-
from lightning.fabric.utilities.port_manager import get_port_manager
388-
389-
manager = get_port_manager()
390-
manager.release_all()
391-
392386
# Clear MASTER_PORT so cluster environment allocates a fresh port on retry
393387
import os
394388

0 commit comments

Comments
 (0)