Skip to content

Commit ea3739e

Browse files
authored
Fix: fuse message not aligned on different processes (NVIDIA#3067)
Signed-off-by: Kaiyu Xie <[email protected]>
1 parent d70ff79 commit ea3739e

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

tensorrt_llm/executor/postproc_worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ async def _mainloop(self):
158158

159159
async def handle_single_input(inp: PostprocWorker.Input,
160160
batch: List[PostprocWorker.Output]):
161-
assert isinstance(inp, PostprocWorker.Input)
161+
assert isinstance(
162+
inp, PostprocWorker.Input
163+
), f"Expect PostprocWorker.Input, got {type(inp)}."
162164
client_id = inp.rsp.client_id
163165
is_final = inp.rsp.result.is_final if isinstance(
164166
inp.rsp, tllm.Response) else True

tensorrt_llm/executor/worker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ def worker_main(
552552
# processes, each one is a PAIR zmq socket
553553
result_queues = [
554554
FusedIpcQueue(is_server=True,
555-
fuse_message=True,
555+
fuse_message=not BATCH_RESP_IN_AWAIT,
556556
name=f"postprocess_{i}_feedin_queue")
557557
for i in range(postproc_worker_config.num_postprocess_workers)
558558
]
@@ -803,7 +803,8 @@ def handle_for_ipc_batched(self, responses: List[tllm.Response]) -> None:
803803

804804
if postproc_batches:
805805
for wid, batch in enumerate(postproc_batches):
806-
self.worker.postproc_queues[wid].put(batch)
806+
if batch:
807+
self.worker.postproc_queues[wid].put(batch)
807808

808809
if rsp_batch:
809810
self.worker.result_queue.put(rsp_batch)

0 commit comments

Comments
 (0)