2222
2323import ray
2424from loguru import logger
25- from runner .utils import run_shm_size_check
25+ from runner .utils import get_shm_usage
2626
2727from nemo_curator .core .client import RayClient
2828from nemo_curator .core .utils import check_ray_responsive
3131ray_client_start_poll_interval_s = 0.5
3232
3333
34+ _RAY_CLEANUP_WAIT_S = 10
35+
36+
37+ def _wait_for_ray_cleanup () -> None :
38+ """Wait for Ray child processes to exit and /dev/shm segments to release after stopping a cluster."""
39+ logger .info (f"Waiting { _RAY_CLEANUP_WAIT_S } s for Ray to clean up child processes and release /dev/shm..." )
40+ time .sleep (_RAY_CLEANUP_WAIT_S )
41+
42+ shm = get_shm_usage ()
43+ if shm ["summary" ]:
44+ logger .info (f"SHM usage after cleanup wait: { shm ['summary' ]} " )
45+
46+
3447def setup_ray_cluster_and_env ( # noqa: PLR0913
3548 num_cpus : int ,
3649 num_gpus : int ,
@@ -52,9 +65,14 @@ def setup_ray_cluster_and_env( # noqa: PLR0913
5265 if ray_address_env :
5366 logger .warning (f"RAY_ADDRESS already set in environment: { ray_address_env } " )
5467
68+ shm = get_shm_usage ()
69+ if shm ["summary" ]:
70+ logger .info (f"SHM usage before Ray cluster setup: { shm ['summary' ]} " )
71+
5572 responsive = False
5673 retries = 0
5774 max_retries = 5
75+ client = None
5876 while not responsive and retries < max_retries :
5977 logger .info (f"Starting Ray cluster (attempt { retries + 1 } of { max_retries } )..." )
6078
@@ -73,14 +91,23 @@ def setup_ray_cluster_and_env( # noqa: PLR0913
7391 ray_stdouterr_capture_file = ray_stdouterr_capture_file ,
7492 object_store_memory = object_store_size ,
7593 )
76- client .start ()
7794
78- _ensure_ray_client_process_started (client , ray_client_start_timeout_s , ray_client_start_poll_interval_s )
79- responsive = check_ray_responsive ()
80- run_shm_size_check (human_readable = True )
95+ try :
96+ client .start ()
97+ _ensure_ray_client_process_started (client , ray_client_start_timeout_s , ray_client_start_poll_interval_s )
98+ responsive = True
99+ except Exception :
100+ logger .exception (f"Ray cluster start failed on attempt { retries + 1 } " )
101+ responsive = False
102+
81103 if not responsive :
82- logger .info ("Ray cluster did not become responsive in time, stopping client and retrying..." )
83- client .stop ()
104+ logger .info ("Ray cluster did not become responsive, cleaning up before retry..." )
105+ try :
106+ client .stop ()
107+ except Exception :
108+ logger .exception ("Failed to stop client during retry cleanup" )
109+ os .environ .pop ("RAY_ADDRESS" , None )
110+ _wait_for_ray_cleanup ()
84111 retries += 1
85112
86113 if not responsive :
@@ -105,6 +132,10 @@ def teardown_ray_cluster_and_env(
105132 ray_client .stop ()
106133 except Exception :
107134 logger .exception ("Failed to stop Ray client" )
135+
136+ # Wait for Ray child processes to exit and /dev/shm to release
137+ _wait_for_ray_cleanup ()
138+
108139 # Copy debugging artifacts and clean up temp directory
109140 try :
110141 _copy_ray_debug_artifacts (ray_temp_path , ray_cluster_path )
@@ -114,12 +145,18 @@ def teardown_ray_cluster_and_env(
114145
115146
116147def get_ray_cluster_data () -> dict [str , Any ]:
117- """Get resource data from the Ray cluster."""
118- ray .init (ignore_reinit_error = True )
119- time .sleep (0.2 ) # ray.available_resources() returns might have a lag
120- ray_data = ray .cluster_resources ()
121- ray .shutdown ()
122- return ray_data
148+ """Get resource data from the Ray cluster.
149+
150+ If the cluster is not responsive (e.g. crashed due to OOM), returns an empty dict
151+ instead of connecting — ray.init() on a dead cluster fatally terminates the process
152+ via Ray's C++ core worker.
153+ """
154+ if not check_ray_responsive ():
155+ logger .warning ("Ray cluster is not responsive, skipping cluster data collection" )
156+ return {}
157+ with ray .init (ignore_reinit_error = True ):
158+ time .sleep (0.2 ) # ray.available_resources() returns might have a lag
159+ return ray .cluster_resources ()
123160
124161
125162def _ensure_ray_client_process_started (client : RayClient , timeout_s : int , poll_interval_s : float ) -> None :
0 commit comments