From e5f34d1ca8ba37d9a71b4395a2b6983b7f13c393 Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Mon, 25 Aug 2025 12:54:14 -0700 Subject: [PATCH 1/2] More RPC examples --- distributed/rpc/batch/parameter_server.py | 16 +++++++++++++++- distributed/rpc/batch/reinforce.py | 13 ++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/distributed/rpc/batch/parameter_server.py b/distributed/rpc/batch/parameter_server.py index 06bc301c6e..babe667ff6 100644 --- a/distributed/rpc/batch/parameter_server.py +++ b/distributed/rpc/batch/parameter_server.py @@ -1,8 +1,10 @@ import os import threading from datetime import datetime +import warnings import torch +import torch.distributed as dist import torch.distributed.rpc as rpc import torch.multiprocessing as mp import torch.nn as nn @@ -10,6 +12,9 @@ import torchvision +# Suppress deprecated ProcessGroup warning +warnings.filterwarnings("ignore", message="You are using a Backend.*ProcessGroup") + batch_size = 20 image_w = 64 @@ -114,9 +119,17 @@ def run_ps(trainers): def run(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' + + # Initialize the process group first + dist.init_process_group( + backend="gloo", + rank=rank, + world_size=world_size + ) + options=rpc.TensorPipeRpcBackendOptions( num_worker_threads=16, - rpc_timeout=0 # infinite timeout + rpc_timeout=60 # 60 second timeout instead of infinite ) if rank != 0: rpc.init_rpc( @@ -137,6 +150,7 @@ def run(rank, world_size): # block until all rpcs finish rpc.shutdown() + dist.destroy_process_group() if __name__=="__main__": diff --git a/distributed/rpc/batch/reinforce.py b/distributed/rpc/batch/reinforce.py index 93661229d5..0a69c8f5d1 100644 --- a/distributed/rpc/batch/reinforce.py +++ b/distributed/rpc/batch/reinforce.py @@ -3,6 +3,10 @@ import os import threading import time +import warnings + +# Suppress deprecated ProcessGroup warning +warnings.filterwarnings("ignore", message="You are using a Backend.*ProcessGroup") import torch import torch.distributed.rpc as rpc @@ -26,6 +30,8 @@ help='random seed (default: 543)') parser.add_argument('--num-episode', type=int, default=10, metavar='E', help='number of episodes (default: 10)') +parser.add_argument('--max-world-size', type=int, default=3, metavar='W', + help='maximum world size to test (default: 3)') args = parser.parse_args() torch.manual_seed(args.seed) @@ -79,7 +85,8 @@ def run_episode(self, agent_rref, n_steps): agent_rref (RRef): an RRef referencing the agent object. n_steps (int): number of steps in this episode """ - state, ep_reward = self.env.reset(), NUM_STEPS + state, _ = self.env.reset() + ep_reward = NUM_STEPS rewards = torch.zeros(n_steps) start_step = 0 for step in range(n_steps): @@ -101,7 +108,7 @@ def run_episode(self, agent_rref, n_steps): for i in range(curr_rewards.numel() -1, -1, -1): R = curr_rewards[i] + args.gamma * R curr_rewards[i] = R - state = self.env.reset() + state, _ = self.env.reset() if start_step == 0: ep_reward = min(ep_reward, step - start_step + 1) start_step = step + 1 @@ -235,7 +242,7 @@ def run_worker(rank, world_size, n_episode, batch, print_log=True): def main(): - for world_size in range(2, 12): + for world_size in range(2, args.max_world_size): delays = [] for batch in [True, False]: tik = time.time() From 47980ab8eed174ccdc00344ceb24cd9631ee1947 Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Mon, 25 Aug 2025 12:55:03 -0700 Subject: [PATCH 2/2] update --- distributed/rpc/batch/parameter_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/rpc/batch/parameter_server.py b/distributed/rpc/batch/parameter_server.py index babe667ff6..798f3f0097 100644 --- a/distributed/rpc/batch/parameter_server.py +++ b/distributed/rpc/batch/parameter_server.py @@ -129,7 +129,7 @@ def run(rank, world_size): options=rpc.TensorPipeRpcBackendOptions( num_worker_threads=16, - rpc_timeout=60 # 60 second timeout instead of infinite + rpc_timeout=60 ) if rank != 0: rpc.init_rpc(