Skip to content

Commit bfeaa63

Browse files
committed
Add port manager diagnostics and improve retry handling
- Add logging when port queue utilization >80% to detect exhaustion - Enhance RuntimeError with detailed diagnostics (queue utilization %) - Add DistNetworkError type check in pytorch conftest for subprocess failures - Add test coverage for high queue utilization warning - Helps diagnose EADDRINUSE issues in CI distributed tests
1 parent 92d5a9e commit bfeaa63

File tree

4 files changed

+58
-5
lines changed

4 files changed

+58
-5
lines changed

src/lightning/fabric/utilities/port_manager.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
"""Port allocation manager to prevent race conditions in distributed training."""
1515

1616
import atexit
17+
import logging
1718
import socket
1819
import threading
1920
from collections import deque
2021
from collections.abc import Iterator
2122
from contextlib import contextmanager
2223
from typing import Optional
2324

25+
log = logging.getLogger(__name__)
26+
2427
# Size of the recently released ports queue
2528
# This prevents immediate reuse of ports that were just released
26-
# Increased to 1024 to reduce the chance of cycling back to TIME_WAIT ports
29+
# Set to 1024 to balance memory usage vs TIME_WAIT protection
2730
_RECENTLY_RELEASED_PORTS_MAXLEN = 1024
2831

2932

@@ -78,12 +81,29 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
7881
# This prevents race conditions within our process
7982
if port not in self._allocated_ports and port not in self._recently_released:
8083
self._allocated_ports.add(port)
84+
85+
# Log diagnostics if queue utilization is high
86+
queue_count = len(self._recently_released)
87+
if queue_count > _RECENTLY_RELEASED_PORTS_MAXLEN * 0.8: # >80% full
88+
log.warning(
89+
f"Port queue utilization high: {queue_count}/{_RECENTLY_RELEASED_PORTS_MAXLEN} "
90+
f"({queue_count / _RECENTLY_RELEASED_PORTS_MAXLEN * 100:.1f}% full). "
91+
f"Allocated port {port}. Active allocations: {len(self._allocated_ports)}"
92+
)
93+
8194
return port
8295

96+
# Provide detailed diagnostics to understand allocation failures
97+
allocated_count = len(self._allocated_ports)
98+
queue_count = len(self._recently_released)
99+
queue_capacity = _RECENTLY_RELEASED_PORTS_MAXLEN
100+
queue_utilization = (queue_count / queue_capacity * 100) if queue_capacity > 0 else 0
101+
83102
raise RuntimeError(
84103
f"Failed to allocate a free port after {max_attempts} attempts. "
85-
f"Currently allocated: {len(self._allocated_ports)}, "
86-
f"recently released: {len(self._recently_released)}"
104+
f"Diagnostics: allocated={allocated_count}, "
105+
f"recently_released={queue_count}/{queue_capacity} ({queue_utilization:.1f}% full). "
106+
f"If queue is near capacity, consider increasing _RECENTLY_RELEASED_PORTS_MAXLEN."
87107
)
88108

89109
def release_port(self, port: int) -> None:

tests/tests_fabric/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,14 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
230230
"""
231231
if call.excinfo is not None and call.when == "call":
232232
exception_msg = str(call.excinfo.value)
233+
exception_type = str(type(call.excinfo.value).__name__)
233234
# Check if this is an EADDRINUSE error from distributed training
234-
if "EADDRINUSE" in exception_msg or "address already in use" in exception_msg.lower():
235+
# Catch both direct EADDRINUSE errors and DistNetworkError which wraps them
236+
if (
237+
"EADDRINUSE" in exception_msg
238+
or "address already in use" in exception_msg.lower()
239+
or "DistNetworkError" in exception_type
240+
):
235241
# Get the retry count from the test node
236242
retry_count = getattr(item, "_port_retry_count", 0)
237243
max_retries = 3

tests/tests_fabric/utilities/test_port_manager.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,3 +753,24 @@ def test_port_manager_reserve_clears_recently_released():
753753
assert port in manager._allocated_ports
754754

755755
manager.release_port(port)
756+
757+
758+
def test_port_manager_high_queue_utilization_warning(caplog):
759+
"""Test that warning is logged when queue utilization exceeds 80%."""
760+
import logging
761+
762+
manager = PortManager()
763+
764+
# Fill queue to >80% (821/1024 = 80.2%)
765+
for _ in range(821):
766+
port = manager.allocate_port()
767+
manager.release_port(port)
768+
769+
# Next allocation should trigger warning
770+
with caplog.at_level(logging.WARNING):
771+
port = manager.allocate_port()
772+
manager.release_port(port)
773+
774+
# Verify warning was logged
775+
assert any("Port queue utilization high" in record.message for record in caplog.records)
776+
assert any("80." in record.message for record in caplog.records) # Should show 80.x%

tests/tests_pytorch/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,14 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
358358
"""
359359
if call.excinfo is not None and call.when == "call":
360360
exception_msg = str(call.excinfo.value)
361+
exception_type = str(type(call.excinfo.value).__name__)
361362
# Check if this is an EADDRINUSE error from distributed training
362-
if "EADDRINUSE" in exception_msg or "address already in use" in exception_msg.lower():
363+
# Catch both direct EADDRINUSE errors and DistNetworkError which wraps them
364+
if (
365+
"EADDRINUSE" in exception_msg
366+
or "address already in use" in exception_msg.lower()
367+
or "DistNetworkError" in exception_type
368+
):
363369
# Get the retry count from the test node
364370
retry_count = getattr(item, "_port_retry_count", 0)
365371
max_retries = 3

0 commit comments

Comments
 (0)