|
16 | 16 | from vllm.distributed import (ensure_model_parallel_initialized,
|
17 | 17 | init_distributed_environment,
|
18 | 18 | set_custom_all_reduce)
|
19 |
| -from vllm.distributed.kv_transfer import ensure_kv_transfer_initialized |
| 19 | +from vllm.distributed.kv_transfer import (ensure_kv_transfer_initialized, |
| 20 | + has_kv_transfer_group) |
20 | 21 | from vllm.distributed.parallel_state import get_pp_group, get_tp_group
|
21 | 22 | from vllm.logger import init_logger
|
22 | 23 | from vllm.lora.request import LoRARequest
|
@@ -342,19 +343,20 @@ def execute_model(
|
342 | 343 | assert isinstance(output, IntermediateTensors)
|
343 | 344 | get_pp_group().send_tensor_dict(output.tensors,
|
344 | 345 | all_gather_group=get_tp_group())
|
| 346 | + if not has_kv_transfer_group(): |
| 347 | + return None |
345 | 348 |
|
346 | 349 | # In case of PP with kv transfer, we need to pass through the
|
347 | 350 | # finished_sending and finished_recving buffers.
|
348 |
| - empty_output = EMPTY_MODEL_RUNNER_OUTPUT |
| 351 | + new_output = EMPTY_MODEL_RUNNER_OUTPUT |
349 | 352 | if output.finished_sending or output.finished_recving:
|
350 |
| - empty_output = copy.copy(empty_output) |
351 |
| - empty_output.finished_sending = output.finished_sending |
352 |
| - empty_output.finished_recving = output.finished_recving |
353 |
| - output = empty_output |
| 353 | + new_output = copy.copy(new_output) |
| 354 | + new_output.finished_sending = output.finished_sending |
| 355 | + new_output.finished_recving = output.finished_recving |
| 356 | + output = new_output |
354 | 357 |
|
355 | 358 | assert isinstance(output, ModelRunnerOutput)
|
356 |
| - # return output only from the driver worker |
357 |
| - return output if self.is_driver_worker else None |
| 359 | + return output |
358 | 360 |
|
359 | 361 | def profile(self, is_start: bool = True):
|
360 | 362 | if self.profiler is None:
|
|
0 commit comments