@@ -173,6 +173,30 @@ def _init_placement_groups(self, strategy: str):
173173 if self ._node_placement_groups is not None :
174174 return self ._node_placement_groups
175175
176+ # Check available resources in the Ray cluster
177+ cluster_resources = ray .cluster_resources ()
178+ total_available_gpus = int (cluster_resources .get ("GPU" , 0 ))
179+ total_available_cpus = int (cluster_resources .get ("CPU" , 0 ))
180+
181+ # Calculate required resources
182+ total_requested_gpus = (
183+ sum (self ._bundle_ct_per_node_list ) if self .use_gpus else 0
184+ )
185+ total_requested_cpus = (
186+ sum (self ._bundle_ct_per_node_list ) * self .max_colocated_worker_groups
187+ )
188+
189+ # Validate resources
190+ if self .use_gpus and total_requested_gpus > total_available_gpus :
191+ raise ValueError (
192+ f"Not enough GPUs available. Requested { total_requested_gpus } GPUs, but only { total_available_gpus } are available in the cluster."
193+ )
194+
195+ if total_requested_cpus > total_available_cpus :
196+ raise ValueError (
197+ f"Not enough CPUs available. Requested { total_requested_cpus } CPUs, but only { total_available_cpus } are available in the cluster."
198+ )
199+
176200 num_cpus_per_bundle = self .max_colocated_worker_groups
177201 # num_gpus_per_bundle == 1 indicates that there is 1 GPU per process
178202 num_gpus_per_bundle = 1 if self .use_gpus else 0
@@ -192,7 +216,24 @@ def _init_placement_groups(self, strategy: str):
192216 for i , bundles in enumerate (resources )
193217 ]
194218
195- ray .get ([pg .ready () for pg in self ._node_placement_groups ])
219+ # Add timeout to prevent hanging indefinitely
220+ try :
221+ ray .get (
222+ [pg .ready () for pg in self ._node_placement_groups ], timeout = 180
223+ ) # 3-minute timeout
224+ except (TimeoutError , ray .exceptions .GetTimeoutError ):
225+ # Clean up any created placement groups
226+ for pg in self ._node_placement_groups :
227+ try :
228+ remove_placement_group (pg )
229+ except Exception :
230+ pass
231+ self ._node_placement_groups = None
232+ raise TimeoutError (
233+ "Timed out waiting for placement groups to be ready. The cluster may not have enough resources "
234+ "to satisfy the requested configuration, or the resources may be busy with other tasks."
235+ )
236+
196237 return self ._node_placement_groups
197238
198239 def get_placement_groups (self ):
0 commit comments