Skip to content

Commit 617296a

Browse files
committed
Log more info
1 parent dae03f9 commit 617296a

File tree

10 files changed

+42
-3
lines changed

10 files changed

+42
-3
lines changed

tensorrt_llm/executor/base_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ def submit(self, request: GenerationRequest) -> GenerationResult:
638638
return result
639639

640640
def shutdown(self):
641+
print(f"========================== Shutting down worker {self.rank}")
641642
if self.doing_shutdown:
642643
return
643644
else:

tensorrt_llm/executor/proxy.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ def pre_shutdown(self):
290290
self.request_queue.put_noblock(None, retry=4)
291291

292292
def shutdown(self):
293+
print(
294+
f"====================== shutdown in GenerationExecutorProxy is called pid: {os.getpid()}"
295+
)
293296
if not self.workers_started:
294297
return
295298

@@ -325,6 +328,10 @@ def shutdown(self):
325328
self.result_queue.close()
326329

327330
self.workers_started = False
331+
print(
332+
f"====================== shutdown in GenerationExecutorProxy 2 is called pid: {os.getpid()}"
333+
)
334+
328335
self.mpi_session.shutdown()
329336

330337
# Process the errors in-case error during shutting down the threads

tensorrt_llm/executor/ray_executor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,9 @@ def shutdown(self):
294294
self._shutdown_event.set()
295295

296296
logger_debug(f"Shutting down RayExecutor", color="yellow")
297-
297+
print(
298+
f"====================== shutdown in RayExecutor is called pid: {os.getpid()}"
299+
)
298300
if hasattr(self, 'main_loop') and self.main_loop and hasattr(
299301
self, 'main_loop_task_obj') and self.main_loop_task_obj:
300302
logger_debug("Cancelling main loop task.", color="yellow")

tensorrt_llm/executor/ray_gpu_worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ def call_worker_method(self, method_name: str, *args, **kwargs):
151151
f"The RayGPUWorker has no method called '{method_name}'.")
152152

153153
def shutdown(self):
154+
print(
155+
f"====================== shutdown in RayWorkerWrapper is called pid: {os.getpid()}"
156+
)
154157
if hasattr(self, 'worker'):
155158
self.worker.shutdown()
156159

@@ -298,6 +301,9 @@ def shutdown(self):
298301
return
299302
else:
300303
self.doing_shutdown = True
304+
print(
305+
f"====================== shutdown in RayWorkerWrapper is called pid: {os.getpid()}"
306+
)
301307

302308
logger.debug(f'Worker {self.rank} shutting down...')
303309

tensorrt_llm/executor/rpc/rpc_server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ def shutdown(self, is_remote_call: bool = False) -> None:
134134
# Set the stop event to True, this will trigger immediate shutdown
135135
self._stop_event.set()
136136

137+
print(
138+
f"==================================== RPCServer shutdown called, is_remote_call={is_remote_call}"
139+
)
137140
# Log pending requests that will be cancelled
138141
logger_debug(
139142
f"[server] RPCServer is shutting down: {self._num_pending_requests} pending requests will be cancelled"

tensorrt_llm/executor/rpc_proxy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,12 @@ def shutdown(self):
191191
traceback.print_stack()
192192
import os
193193
print(
194-
f"====================== shutdown in generator is called pid: {os.getpid()}"
194+
f"====================== shutdown in GenerationExecutorRpcProxy is called pid: {os.getpid()}"
195195
)
196196
if self._shutdown_event.is_set():
197197
return
198198
print(
199-
f"====================== shutdown in generator 2 is called pid: {os.getpid()}"
199+
f"====================== shutdown in GenerationExecutorRpcProxy 2 is called pid: {os.getpid()}"
200200
)
201201
self._shutdown_event.set()
202202
logger_debug(f"Shutting down GenerationExecutorRpcProxy",

tensorrt_llm/executor/rpc_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ def shutdown(self):
102102
logger_debug(f"[worker] RpcWorker #{mpi_rank()} is shutting down",
103103
color="yellow")
104104
self.shutdown_event.set()
105+
print(
106+
f"====================== shutdown in RpcWorker is called pid: {os.getpid()}"
107+
)
105108
super().shutdown()
106109
logger_debug(f"[worker] RpcWorker #{mpi_rank()} is shutdown",
107110
color="yellow")

tensorrt_llm/executor/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ def submit_sync(self, task: Callable, *args, **kwargs) -> List[Any]:
101101
return [future.result() for future in futures]
102102

103103
def shutdown(self):
104+
print(
105+
f"==================================== shutdown ProcessPoolExecutor session"
106+
)
104107
self.mpi_pool.shutdown(wait=True)
105108

106109

tensorrt_llm/executor/worker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ def shutdown(self):
101101
self.await_response_thread.stop()
102102
self.await_response_thread.join()
103103

104+
print(
105+
f"====================== GenerationExecutorWorker engine shutdown is called pid: {os.getpid()}"
106+
)
104107
self.engine.shutdown()
105108
self.engine = None
106109

@@ -320,6 +323,9 @@ def notify_proxy_threads_to_quit():
320323
else:
321324
raise ValueError(f"Unknown request type: {type(req)}")
322325

326+
print(
327+
f"====================== Worker {mpi_rank()} received shutdown signal from proxy process."
328+
)
323329
notify_proxy_threads_to_quit()
324330

325331
except GenerationExecutorWorker.WorkerExit as e:
@@ -328,7 +334,11 @@ def notify_proxy_threads_to_quit():
328334

329335
except Exception as e: # other critical errors
330336
if is_leader:
337+
print(
338+
f"====================== Worker {mpi_rank()} received shutdown signal from proxy process."
339+
)
331340
notify_proxy_threads_to_quit()
332341
logger.error(traceback.format_exc())
333342
# This will be captured by mpi4py and handled by future.done_callback
334343
raise e
344+
logger_debug(f"Worker {mpi_rank()} exiting worker_main...\n", "green")

tensorrt_llm/llmapi/mpi_session.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ def submit_sync(self, task: Callable[..., T], *args, **kwargs) -> List[T]:
160160
return [future.result() for future in futures]
161161

162162
def shutdown(self, wait=True):
163+
print(f"==================================== shutdown MPI pool session")
163164
if self.mpi_pool is not None:
164165
self.mpi_pool.shutdown(wait=wait)
165166
self.mpi_pool = None
@@ -237,6 +238,9 @@ def submit_sync(self, task: Callable[..., T], *args, **kwargs) -> List[T]:
237238
def shutdown(self, wait=True):
238239
# Only shutdown the mpi_pool if this instance created it
239240
# For shared global mpi_pool, we don't shut it down
241+
print(
242+
f"==================================== shutdown is called MPI comm session"
243+
)
240244
if self.mpi_pool is not None and self.owns_mpi_pool:
241245
self.mpi_pool.shutdown(wait=wait)
242246
self.mpi_pool = None

0 commit comments

Comments
 (0)