@@ -79,8 +79,35 @@ def launch_distributed(
79
79
f"{ project_name .replace (' ' ,'_' )} _run_{ wandb_group_name } .jsonl" ,
80
80
)
81
81
82
- procs = []
82
+ # Attention: Ray use complex schedualing method that consider various factors including load-balancing.
83
+ # when requesting resources, it is not guaranteed that the resource comes from a node with lower node it
84
+ # this go against the design principle of our implementation, and we need to manually force the schedualing,
85
+ # allocating the producer to nodes with lower node id and the consumer to the resouces from nodes with higher
86
+ # node id. See the reference here: https://docs.ray.io/en/latest/ray-core/scheduling/index.html#nodeaffinityschedulingstrategy
87
+ nodes = ray .nodes ()
88
+ node_info = {
89
+ node ["NodeID" ]: {
90
+ "num_gpus" : node ["Resources" ].get ("GPU" , 0 ),
91
+ "address" : node ["NodeManagerAddress" ],
92
+ } # Default to 0 if no GPUs are available
93
+ for node in nodes
94
+ }
95
+ gpu_to_node_id = []
96
+ gpu_to_ip_address = []
97
+ for node_id in node_info :
98
+ for idx in range (int (node_info [node_id ]["num_gpus" ])):
99
+ gpu_to_node_id .append (node_id )
100
+ gpu_to_ip_address .append (node_info [node_id ]["address" ])
101
+ print (node_info )
102
+
103
+ producer_procs = []
83
104
for i in range (num_producers ):
105
+ node_id = gpu_to_node_id [0 ]
106
+ producer_ip_address = gpu_to_ip_address [0 ]
107
+ for _ in range (num_proc_per_producer ):
108
+ gpu_to_node_id .pop (0 )
109
+ gpu_to_ip_address .pop (0 )
110
+ print (f"Schedual Producer P[{ i } ] which requires { num_proc_per_producer } GPUs on node { producer_ip_address } " )
84
111
producer = SimpleProducer .options (num_gpus = num_proc_per_producer ).remote (
85
112
producer_idx = i ,
86
113
num_producers = num_producers ,
@@ -106,20 +133,29 @@ def launch_distributed(
106
133
log_rollout_interval = log_rollout_interval ,
107
134
rollout_log_file = rollout_log_file ,
108
135
)
109
- procs .append (producer )
136
+ producer_procs .append (producer )
137
+ ray .get ([p .setup .remote () for p in producer_procs ])
110
138
generate_config_consumer = copy .deepcopy (generate_config )
111
139
generate_config_consumer .update (
112
140
dict (
113
141
backend = inference_backend ,
114
142
)
115
143
)
144
+ consumer_master_ip_address = gpu_to_ip_address [0 ]
145
+ print (f"Use { consumer_master_ip_address } as master address for torch DDP." )
146
+ consumer_procs = []
116
147
for i in range (num_consumer_procs ):
148
+ node_id = gpu_to_node_id [0 ]
149
+ consumer_ip_address = gpu_to_ip_address [0 ]
150
+ gpu_to_node_id .pop (0 )
151
+ gpu_to_ip_address .pop (0 )
152
+ print (f"Schedual Consumer T[{ i } ] which requires 1 GPUs on node { consumer_ip_address } " )
117
153
consumer = core_consumer .options (num_gpus = 1 ).remote (
118
154
num_producers = num_producers ,
119
155
num_episodes = num_episodes ,
120
156
rank = i ,
121
157
world_size = num_consumer_procs ,
122
- master_addr = master_addr ,
158
+ master_addr = consumer_master_ip_address ,
123
159
master_port = master_port ,
124
160
num_update_per_episode = num_update_per_episode ,
125
161
num_recv_per_update = num_recv_per_update ,
@@ -136,6 +172,6 @@ def launch_distributed(
136
172
run_name = run_name ,
137
173
wandb_group_name = wandb_group_name ,
138
174
)
139
- procs .append (consumer )
140
- ray .get ([p .setup .remote () for p in procs ])
141
- ray .get ([p .loop .remote () for p in procs ])
175
+ consumer_procs .append (consumer )
176
+ ray .get ([p .setup .remote () for p in consumer_procs ])
177
+ ray .get ([p .loop .remote () for p in ( producer_procs + consumer_procs ) ])
0 commit comments