Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/lightning/fabric/utilities/port_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
from collections import deque
from collections.abc import Iterator
from contextlib import contextmanager
from multiprocessing import Lock
from typing import Optional

lock = Lock()
log = logging.getLogger(__name__)

# Size of the recently released ports queue
Expand All @@ -31,7 +33,7 @@


class PortManager:
"""Thread-safe port manager to prevent EADDRINUSE errors.
"""Process-safe port manager to prevent EADDRINUSE errors.

This manager maintains a global registry of allocated ports to ensure that multiple concurrent tests don't try to
use the same port. While this doesn't completely eliminate the race condition with external processes, it prevents
Expand All @@ -40,7 +42,6 @@ class PortManager:
"""

def __init__(self) -> None:
self._lock = threading.Lock()
self._allocated_ports: set[int] = set()
# Recently released ports are kept in a queue to avoid immediate reuse
self._recently_released: deque[int] = deque(maxlen=_RECENTLY_RELEASED_PORTS_MAXLEN)
Expand All @@ -61,7 +62,7 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
RuntimeError: If unable to find a free port after max_attempts

"""
with self._lock:
with lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be for all and so could we have it as a wrapper?

# If a preferred port is specified and available, use it
if (
preferred_port is not None
Expand Down Expand Up @@ -113,15 +114,15 @@ def release_port(self, port: int) -> None:
port: Port number to release

"""
with self._lock:
with lock:
if port in self._allocated_ports:
self._allocated_ports.remove(port)
# Add to the back of the queue; oldest will be evicted when queue is full
self._recently_released.append(port)

def release_all(self) -> None:
"""Release all allocated ports."""
with self._lock:
with lock:
self._allocated_ports.clear()
self._recently_released.clear()

Expand All @@ -138,7 +139,7 @@ def reserve_existing_port(self, port: int) -> bool:
if port <= 0 or port > 65535:
return False

with self._lock:
with lock:
if port in self._allocated_ports:
return True

Expand Down
6 changes: 0 additions & 6 deletions tests/tests_fabric/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
yellow=True,
)

# Clear the port manager's state to get fresh ports
from lightning.fabric.utilities.port_manager import get_port_manager

manager = get_port_manager()
manager.release_all()

# Clear MASTER_PORT so cluster environment allocates a fresh port on retry
import os

Expand Down
6 changes: 0 additions & 6 deletions tests/tests_pytorch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,6 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
yellow=True,
)

# Clear the port manager's state to get fresh ports
from lightning.fabric.utilities.port_manager import get_port_manager

manager = get_port_manager()
manager.release_all()

# Clear MASTER_PORT so cluster environment allocates a fresh port on retry
import os

Expand Down
Loading