diff --git a/src/lightning/fabric/utilities/port_manager.py b/src/lightning/fabric/utilities/port_manager.py index cdf19605023d5..54c325dc738ba 100644 --- a/src/lightning/fabric/utilities/port_manager.py +++ b/src/lightning/fabric/utilities/port_manager.py @@ -20,8 +20,10 @@ from collections import deque from collections.abc import Iterator from contextlib import contextmanager +from multiprocessing import Lock from typing import Optional +lock = Lock() log = logging.getLogger(__name__) # Size of the recently released ports queue @@ -31,7 +33,7 @@ class PortManager: - """Thread-safe port manager to prevent EADDRINUSE errors. + """Process-safe port manager to prevent EADDRINUSE errors. This manager maintains a global registry of allocated ports to ensure that multiple concurrent tests don't try to use the same port. While this doesn't completely eliminate the race condition with external processes, it prevents @@ -40,7 +42,6 @@ class PortManager: """ def __init__(self) -> None: - self._lock = threading.Lock() self._allocated_ports: set[int] = set() # Recently released ports are kept in a queue to avoid immediate reuse self._recently_released: deque[int] = deque(maxlen=_RECENTLY_RELEASED_PORTS_MAXLEN) @@ -61,7 +62,7 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int RuntimeError: If unable to find a free port after max_attempts """ - with self._lock: + with lock: # If a preferred port is specified and available, use it if ( preferred_port is not None @@ -113,7 +114,7 @@ def release_port(self, port: int) -> None: port: Port number to release """ - with self._lock: + with lock: if port in self._allocated_ports: self._allocated_ports.remove(port) # Add to the back of the queue; oldest will be evicted when queue is full @@ -121,7 +122,7 @@ def release_port(self, port: int) -> None: def release_all(self) -> None: """Release all allocated ports.""" - with self._lock: + with lock: self._allocated_ports.clear() self._recently_released.clear() @@ -138,7 +139,7 @@ def reserve_existing_port(self, port: int) -> bool: if port <= 0 or port > 65535: return False - with self._lock: + with lock: if port in self._allocated_ports: return True diff --git a/tests/tests_fabric/conftest.py b/tests/tests_fabric/conftest.py index a06bb0eacdbb4..df1b18ec5d891 100644 --- a/tests/tests_fabric/conftest.py +++ b/tests/tests_fabric/conftest.py @@ -255,12 +255,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None: yellow=True, ) - # Clear the port manager's state to get fresh ports - from lightning.fabric.utilities.port_manager import get_port_manager - - manager = get_port_manager() - manager.release_all() - # Clear MASTER_PORT so cluster environment allocates a fresh port on retry import os diff --git a/tests/tests_pytorch/conftest.py b/tests/tests_pytorch/conftest.py index bb48ca8717e45..7721472b4a4db 100644 --- a/tests/tests_pytorch/conftest.py +++ b/tests/tests_pytorch/conftest.py @@ -383,12 +383,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None: yellow=True, ) - # Clear the port manager's state to get fresh ports - from lightning.fabric.utilities.port_manager import get_port_manager - - manager = get_port_manager() - manager.release_all() - # Clear MASTER_PORT so cluster environment allocates a fresh port on retry import os