Skip to content

Commit 6684872

Browse files
authored
[misc] Find unused port in distributed init (#475)
1 parent 7f654e3 commit 6684872

File tree

3 files changed

+18
-26
lines changed

3 files changed

+18
-26
lines changed

fastvideo/v1/tests/old_tests/distributed_example.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

fastvideo/v1/worker/gpu_worker.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
class Worker:
3030

3131
def __init__(self, fastvideo_args: FastVideoArgs, local_rank: int,
32-
rank: int, pipe):
32+
rank: int, pipe, master_port: int):
3333
self.fastvideo_args = fastvideo_args
3434
self.local_rank = local_rank
3535
self.rank = rank
3636
# TODO(will): don't hardcode this
3737
self.distributed_init_method = "env://"
3838
self.pipe = pipe
39-
39+
self.master_port = master_port
4040
self.init_device()
4141

4242
# Init request dispatcher
@@ -76,7 +76,7 @@ def init_device(self) -> None:
7676
f"Unsupported device: {self.fastvideo_args.device_str}")
7777

7878
os.environ["MASTER_ADDR"] = "localhost"
79-
os.environ["MASTER_PORT"] = "29503"
79+
os.environ["MASTER_PORT"] = str(self.master_port)
8080
os.environ["LOCAL_RANK"] = str(self.local_rank)
8181
os.environ["RANK"] = str(self.rank)
8282

@@ -191,7 +191,7 @@ def init_worker_distributed_environment(
191191

192192

193193
def run_worker_process(fastvideo_args: FastVideoArgs, local_rank: int,
194-
rank: int, pipe):
194+
rank: int, pipe, master_port: int):
195195
# Add process-specific prefix to stdout and stderr
196196
process_name = mp.current_process().name
197197
pid = os.getpid()
@@ -206,8 +206,9 @@ def run_worker_process(fastvideo_args: FastVideoArgs, local_rank: int,
206206
logger.info("Worker %d initializing...",
207207
rank,
208208
local_main_process_only=False)
209+
209210
try:
210-
worker = Worker(fastvideo_args, local_rank, rank, pipe)
211+
worker = Worker(fastvideo_args, local_rank, rank, pipe, master_port)
211212
logger.info("Worker %d sending ready", rank)
212213
pipe.send({
213214
"status": "ready",

fastvideo/v1/worker/multiproc_executor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import multiprocessing as mp
44
import os
55
import signal
6+
import socket
67
import time
78
from multiprocessing.process import BaseProcess
89
from typing import Any, Callable, List, Optional, Union, cast
@@ -28,6 +29,15 @@ def _init_executor(self) -> None:
2829

2930
self.workers: List[BaseProcess] = []
3031
self.worker_pipes = []
32+
self.master_port = None
33+
34+
for port in range(29503, 65535):
35+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
36+
if s.connect_ex(('localhost', port)) != 0:
37+
self.master_port = port
38+
break
39+
if self.master_port is None:
40+
raise ValueError("No unused port found to use as master port")
3141

3242
# Create pipes and start workers
3343
for rank in range(self.world_size):
@@ -39,7 +49,8 @@ def _init_executor(self) -> None:
3949
kwargs=dict(fastvideo_args=self.fastvideo_args,
4050
local_rank=rank,
4151
rank=rank,
42-
pipe=worker_pipe))
52+
pipe=worker_pipe,
53+
master_port=self.master_port))
4354
worker.start()
4455
self.workers.append(worker)
4556

0 commit comments

Comments
 (0)