Skip to content

Commit c27f39a

Browse files
committed
adding logging lock file for debug purpose with file descriptor, avoiding race conditionst
1 parent 4118355 commit c27f39a

File tree

1 file changed

+127
-24
lines changed

1 file changed

+127
-24
lines changed

py/torch_tensorrt/_utils.py

Lines changed: 127 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import tensorrt as trt
1414
import torch
1515

16-
logger = logging.getLogger(__name__)
16+
logger = logging.getLogger("torch_tensorrt")
1717

1818
_WHL_CPYTHON_VERSION = "cp310"
1919
_TENSORRT_LLM_VERSION_ = "0.17.0.post1"
@@ -144,7 +144,9 @@ def _extracted_dir_trtllm(platform_system: str, platform_machine: str) -> Path:
144144
)
145145

146146

147-
def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
147+
def extract_wheel_file(
148+
wheel_path: Path, extract_dir: Path, plugin_lib_path: Path
149+
) -> None:
148150
"""
149151
Safely extract a wheel file to a directory with a lock to prevent concurrent extraction.
150152
"""
@@ -157,19 +159,84 @@ def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
157159
logger.debug(
158160
f"[Rank {rank}] Starting extraction of {wheel_path} to {extract_dir}"
159161
)
162+
# If another job already finished earlier, skip immediately
163+
if plugin_lib_path.exists():
164+
logger.debug(
165+
f"[Rank {rank}] Plugin already present at {plugin_lib_path}, skipping extraction"
166+
)
167+
return
168+
169+
logger.debug(f"[Rank {rank}] Checking wheel file exists: {wheel_path.exists()}")
170+
160171
try:
161172
import zipfile
173+
174+
logger.debug(f"[Rank {rank}] Successfully imported zipfile")
162175
except ImportError as e:
163176
raise ImportError(
164177
"zipfile module is required but not found. Please install zipfile"
165178
)
166-
# Create lock file to signal extraction in progress
167-
extract_dir.mkdir(parents=True, exist_ok=False)
168-
lock_file.touch(exist_ok=False)
179+
180+
# Create extraction directory first (needed for lock file)
181+
extract_dir.mkdir(parents=True, exist_ok=True)
182+
183+
# Acquire lock atomically, portable across different platforms x86/arm/windows
184+
# Only one process should be able to create the lock file with O_EXCL
185+
logger.debug(f"[Rank {rank}] Attempting to acquire lock: {lock_file}")
186+
acquire_start_time = time.time()
187+
while True:
188+
try:
189+
# Re-check in case extractor finished while we waited
190+
if plugin_lib_path.exists():
191+
logger.debug(
192+
f"[Rank {rank}] Plugin appeared at {plugin_lib_path} during acquire, skipping extraction"
193+
)
194+
return
195+
lock_fd = os.open(str(lock_file), os.O_CREAT | os.O_EXCL | os.O_RDWR)
196+
logger.debug(f"[Rank {rank}] Successfully acquired lock")
197+
# write lock owner metadata for race condition time logging
198+
try:
199+
lock_info = f"pid={os.getpid()} host={platform.node()} rank={rank} start={time.time()}\n"
200+
os.write(lock_fd, lock_info.encode("utf-8"))
201+
os.fsync(lock_fd)
202+
except Exception:
203+
# Its fine if we fail to write metadata
204+
pass
205+
break
206+
except FileExistsError:
207+
if time.time() - acquire_start_time > 300:
208+
if not plugin_lib_path.exists():
209+
logger.warning(
210+
f"[Rank {rank}] Timed out waiting for extraction lock at {lock_file} (>300s) and plugin not present at {plugin_lib_path}"
211+
)
212+
raise TimeoutError(
213+
f"[Rank {rank}] Timed out acquiring extraction lock at {lock_file}"
214+
)
215+
time.sleep(0.5)
216+
217+
if plugin_lib_path.exists():
218+
logger.debug(
219+
f"[Rank {rank}] Plugin already present at {plugin_lib_path} after acquire, skipping extraction"
220+
)
221+
return
222+
# With lock held, perform extraction
223+
logger.debug(
224+
f"[Rank {rank}] Lock acquired, starting extraction from {wheel_path}"
225+
)
169226
try:
170227
with zipfile.ZipFile(wheel_path) as zip_ref:
171228
zip_ref.extractall(extract_dir)
172229
logger.debug(f"[Rank {rank}] Extraction complete: {extract_dir}")
230+
231+
# Delete wheel file after successful extraction (only Rank 0)
232+
try:
233+
wheel_path.unlink(missing_ok=True)
234+
logger.debug(f"[Rank {rank}] Deleted wheel file: {wheel_path}")
235+
except Exception as e:
236+
logger.warning(
237+
f"[Rank {rank}] Could not delete wheel file {wheel_path}: {e}"
238+
)
239+
173240
except FileNotFoundError as e:
174241
logger.error(f"[Rank {rank}] Wheel file not found at {wheel_path}: {e}")
175242
raise RuntimeError(
@@ -186,16 +253,63 @@ def extract_wheel_file(wheel_path: Path, extract_dir: Path) -> None:
186253
"Unexpected error during extraction of TensorRT-LLM wheel"
187254
) from e
188255
finally:
189-
# Remove lock file to signal completion
190-
lock_file.unlink(missing_ok=True)
256+
# Release lock, close file descriptorand remove lock file to signal completion
257+
try:
258+
os.close(lock_fd)
259+
except Exception as e:
260+
logger.debug(
261+
f"[Rank {rank}] Failed to close lock fd for {lock_file}: {e}",
262+
exc_info=e,
263+
)
264+
try:
265+
lock_file.unlink(missing_ok=True)
266+
except Exception as e:
267+
logger.debug(
268+
f"[Rank {rank}] Failed to unlink lock file {lock_file}: {e}",
269+
exc_info=e,
270+
)
191271

192272
else:
193-
# Other ranks wait for extraction to complete
194-
while lock_file.exists():
195-
logger.debug(
196-
f"[Rank {rank}] Waiting for extraction to finish at {extract_dir}..."
197-
)
273+
# Other ranks wait for extraction to complete.
274+
# only check lock file - don't check plugin existence during extraction
275+
# because plugin file may be partially written before extraction completes
276+
observed_lock = False
277+
wait_start_time = time.time()
278+
while True:
279+
if lock_file.exists():
280+
observed_lock = True
281+
logger.debug(
282+
f"[Rank {rank}] Waiting for extraction to finish at {extract_dir}..."
283+
)
284+
else:
285+
if observed_lock:
286+
# Lock was present and now gone -> extraction finished
287+
logger.debug(
288+
f"[Rank {rank}] Lock file removed, extraction complete"
289+
)
290+
break
291+
else:
292+
# Lock file never appeared - check if plugin already exists from previous run
293+
if plugin_lib_path.exists():
294+
logger.debug(
295+
f"[Rank {rank}] Plugin already exists from previous run, no extraction needed"
296+
)
297+
break
298+
# Lock not seen yet, keep waiting for Rank 0 to start
299+
logger.debug(
300+
f"[Rank {rank}] Waiting for extraction to start (no lock file yet)..."
301+
)
302+
198303
time.sleep(0.5)
304+
if time.time() - wait_start_time > 600:
305+
# 10 minute safeguard to avoid indefinite waits
306+
logger.warning(
307+
f"[Rank {rank}] Timed out (>600s) waiting for extraction to finish at {extract_dir}; "
308+
f"lock_present={lock_file.exists()} plugin_present={plugin_lib_path.exists()}"
309+
)
310+
raise TimeoutError(
311+
f"[Rank {rank}] Timed out waiting for extraction to finish at {extract_dir}"
312+
)
199313

200314

201315
def download_and_get_plugin_lib_path() -> Optional[str]:
@@ -246,18 +360,7 @@ def download_and_get_plugin_lib_path() -> Optional[str]:
246360
except OSError as e:
247361
logger.error(f"Local file write error: {e}")
248362

249-
extract_wheel_file(wheel_path, extract_dir)
250-
251-
try:
252-
wheel_path.unlink(missing_ok=True)
253-
logger.debug(f"Deleted wheel file: {wheel_path}")
254-
except Exception as e:
255-
logger.warning(f"Could not delete wheel file {wheel_path}: {e}")
256-
if not plugin_lib_path.exists():
257-
logger.error(
258-
f"Plugin library not found at expected location: {plugin_lib_path}"
259-
)
260-
return None
363+
extract_wheel_file(wheel_path, extract_dir, plugin_lib_path)
261364

262365
return str(plugin_lib_path)
263366

0 commit comments

Comments
 (0)