@@ -159,6 +159,40 @@ def extract_wheel_file(
159159 logger .debug (
160160 f"[Rank { rank } ] Starting extraction of { wheel_path } to { extract_dir } "
161161 )
162+
163+ # Check for stale/corrupt extraction and clean up if needed
164+ if lock_file .exists ():
165+ try :
166+ lock_age = time .time () - lock_file .stat ().st_mtime
167+ if lock_age > 300 : # 5 minutes
168+ logger .warning (
169+ f"[Rank { rank } ] Detected stale lock file (age: { lock_age :.1f} s). "
170+ f"Previous extraction likely crashed. Cleaning up and retrying..."
171+ )
172+ # Clean up stale extraction
173+ import shutil
174+
175+ if extract_dir .exists ():
176+ shutil .rmtree (extract_dir , ignore_errors = True )
177+ logger .info (
178+ f"[Rank { rank } ] Cleaned up stale extraction directory: { extract_dir } "
179+ )
180+ elif not plugin_lib_path .exists ():
181+ logger .warning (
182+ f"[Rank { rank } ] Lock file exists but plugin missing. "
183+ f"Previous extraction incomplete. Cleaning up and retrying..."
184+ )
185+ # Clean up incomplete extraction
186+ import shutil
187+
188+ if extract_dir .exists ():
189+ shutil .rmtree (extract_dir , ignore_errors = True )
190+ logger .info (
191+ f"[Rank { rank } ] Cleaned up incomplete extraction directory: { extract_dir } "
192+ )
193+ except Exception as e :
194+ logger .warning (f"[Rank { rank } ] Error checking stale lock: { e } " )
195+
162196 # If another job already finished earlier, skip immediately
163197 if plugin_lib_path .exists ():
164198 logger .debug (
@@ -184,14 +218,14 @@ def extract_wheel_file(
184218 # Only one process should be able to create the lock file with O_EXCL
185219 logger .debug (f"[Rank { rank } ] Attempting to acquire lock: { lock_file } " )
186220 acquire_start_time = time .time ()
221+ # Re-check in case extractor finished while we waited
222+ if plugin_lib_path .exists ():
223+ logger .debug (
224+ f"[Rank { rank } ] Plugin appeared at { plugin_lib_path } during acquire, skipping extraction"
225+ )
226+ return
187227 while True :
188228 try :
189- # Re-check in case extractor finished while we waited
190- if plugin_lib_path .exists ():
191- logger .debug (
192- f"[Rank { rank } ] Plugin appeared at { plugin_lib_path } during acquire, skipping extraction"
193- )
194- return
195229 lock_fd = os .open (str (lock_file ), os .O_CREAT | os .O_EXCL | os .O_RDWR )
196230 logger .debug (f"[Rank { rank } ] Successfully acquired lock" )
197231 # write lock owner metadata for race condition time logging
@@ -218,6 +252,16 @@ def extract_wheel_file(
218252 logger .debug (
219253 f"[Rank { rank } ] Plugin already present at { plugin_lib_path } after acquire, skipping extraction"
220254 )
255+ # Clean up lock and return, since lock already acquired
256+ try :
257+ if lock_fd is not None :
258+ os .close (lock_fd )
259+ except Exception as e :
260+ logger .debug (f"[Rank { rank } ] Failed to close lock fd: { e } " )
261+ try :
262+ lock_file .unlink (missing_ok = True )
263+ except Exception as e :
264+ logger .debug (f"[Rank { rank } ] Failed to unlink lock file: { e } " )
221265 return
222266 # With lock held, perform extraction
223267 logger .debug (
@@ -421,9 +465,33 @@ def load_tensorrt_llm_for_nccl() -> bool:
421465 Attempts to load the TensorRT-LLM plugin and initialize it.
422466 Either the env variable TRTLLM_PLUGINS_PATH can specify the path
423467 Or the user can specify USE_TRTLLM_PLUGINS as either of (1, true, yes, on) to download the TRT-LLM distribution and load it
468+
469+ Environment Variables:
470+ TRTLLM_PLUGINS_PATH: Path to pre-installed TensorRT-LLM plugin library
471+ USE_TRTLLM_PLUGINS: Set to 1/true/yes/on to auto-download plugin
472+ TRTLLM_FORCE_CLEANUP: Set to 1 to force cleanup of cached files on startup (useful after mpirunbus errors)
473+
424474 Returns:
425475 bool: True if the plugin was successfully loaded and initialized, False otherwise.
426476 """
477+ if os .environ .get ("TRTLLM_FORCE_CLEANUP" , "0" ).lower () in (
478+ "1" ,
479+ "true" ,
480+ "yes" ,
481+ "on" ,
482+ ):
483+ import shutil
484+
485+ cache_root = _cache_root ()
486+ if cache_root .exists ():
487+ logger .warning (
488+ f"TRTLLM_FORCE_CLEANUP=1 detected. Cleaning up cache: { cache_root } "
489+ )
490+ shutil .rmtree (cache_root , ignore_errors = True )
491+ logger .info (
492+ "Cache cleaned up. Proceeding with fresh download/extraction..."
493+ )
494+
427495 if not is_platform_supported_for_trtllm ():
428496 return False
429497 plugin_lib_path = os .environ .get ("TRTLLM_PLUGINS_PATH" )
0 commit comments