|
15 | 15 | import os |
16 | 16 | import socket |
17 | 17 |
|
| 18 | +from filelock import FileLock |
18 | 19 | from typing_extensions import override |
19 | 20 |
|
20 | 21 | from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment |
21 | 22 | from lightning.fabric.utilities.rank_zero import rank_zero_only |
22 | 23 |
|
| 24 | +BASE_PORT = 10000 |
| 25 | +MAX_PORT = 65000 |
| 26 | +STEP = 20 |
| 27 | +LOCK_FILE = "lightning_ports.lock" |
| 28 | + |
23 | 29 |
|
24 | 30 | class LightningEnvironment(ClusterEnvironment): |
25 | 31 | """The default environment used by Lightning for a single node or free cluster (not managed). |
@@ -105,15 +111,57 @@ def teardown(self) -> None: |
105 | 111 | del os.environ["WORLD_SIZE"] |
106 | 112 |
|
107 | 113 |
|
108 | | -def find_free_network_port() -> int: |
| 114 | +def is_port_available(port: int) -> bool: |
| 115 | + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| 116 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 117 | + return s.connect_ex(("localhost", port)) != 0 |
| 118 | + |
| 119 | + |
| 120 | +def find_free_network_port(base: int = BASE_PORT, step: int = STEP) -> int: |
109 | 121 | """Finds a free port on localhost. |
110 | 122 |
|
111 | 123 | It is useful in single-node training when we don't want to connect to a real main node but have to set the |
112 | | - `MASTER_PORT` environment variable. |
| 124 | + MASTER_PORT environment variable. |
113 | 125 |
|
114 | 126 | """ |
115 | | - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
116 | | - s.bind(("", 0)) |
117 | | - port = s.getsockname()[1] |
118 | | - s.close() |
119 | | - return port |
| 127 | + PL_FORCE_DETERMINISTIC_PORTS = os.environ.get("PL_FORCE_DETERMINISTIC_PORTS", "0") |
| 128 | + |
| 129 | + if PL_FORCE_DETERMINISTIC_PORTS == "0": |
| 130 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 131 | + s.bind(("", 0)) |
| 132 | + port = s.getsockname()[1] |
| 133 | + s.close() |
| 134 | + return port |
| 135 | + |
| 136 | + # use the last assigned port + step strategy with a file lock to avoid race conditions |
| 137 | + lock_path = os.path.join(os.getcwd(), LOCK_FILE) |
| 138 | + os.makedirs(os.path.dirname(lock_path), exist_ok=True) |
| 139 | + |
| 140 | + with FileLock(lock_path + ".lock"): |
| 141 | + # read used ports |
| 142 | + if os.path.exists(lock_path): |
| 143 | + with open(lock_path) as f: |
| 144 | + used = [int(x.strip()) for x in f if x.strip()] |
| 145 | + else: |
| 146 | + used = [] |
| 147 | + |
| 148 | + candidate = base if not used else used[-1] + step |
| 149 | + if candidate > MAX_PORT: |
| 150 | + candidate = base |
| 151 | + |
| 152 | + tries = 0 |
| 153 | + max_tries = (MAX_PORT - base) // step |
| 154 | + while (not is_port_available(candidate) or candidate in used) and tries < max_tries: |
| 155 | + candidate += step |
| 156 | + if candidate > MAX_PORT: |
| 157 | + candidate = base |
| 158 | + tries += 1 |
| 159 | + |
| 160 | + if tries >= max_tries: |
| 161 | + raise RuntimeError("No free port found in range") |
| 162 | + |
| 163 | + # write the new port to the file |
| 164 | + with open(lock_path, "a") as f: |
| 165 | + f.write(f"{candidate}\n") |
| 166 | + |
| 167 | + return candidate |
0 commit comments