-
Notifications
You must be signed in to change notification settings - Fork 468
[Disagg][Perf] Use NPU event sync instead of blocking tolist to avoid unintentional copy ops blocking across different NPU streams, improving disagg TTIT/TTFT #3209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
Signed-off-by: jesse <[email protected]>
👋 Hi! Thank you for contributing to the vLLM Ascend project. The following points will speed up your PR merge:
If CI fails, you can run linting and testing checks locally according Contributing and Testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a performance optimization for NPU devices by replacing a blocking tolist()
call with a non-blocking copy and event synchronization. This avoids device-wide stalls and improves performance for disaggregated setups. The changes are sound, but I've identified a critical bug in the buffer allocation that could lead to runtime errors under certain configurations. I've also suggested an improvement to the new unit test to make it more robust and maintainable.
self.sampled_token_ids_pinned_cpu = torch.empty( | ||
(self.max_model_len, 1), | ||
dtype=torch.int64, | ||
device="cpu", | ||
pin_memory=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sampled_token_ids_pinned_cpu
buffer is sized using self.max_model_len
, but it's used to hold sampled_token_ids
which can have up to self.max_num_reqs
requests. If max_num_reqs
is configured to be larger than max_model_len
, this will cause a runtime error due to a size mismatch during the copy_
operation in _to_list
. The buffer should be sized using self.max_num_reqs
, which correctly represents the maximum number of requests in a batch.
self.sampled_token_ids_pinned_cpu = torch.empty( | |
(self.max_model_len, 1), | |
dtype=torch.int64, | |
device="cpu", | |
pin_memory=True) | |
self.sampled_token_ids_pinned_cpu = torch.empty( | |
(self.max_num_reqs, 1), | |
dtype=torch.int64, | |
device="cpu", | |
pin_memory=True) |
def test_init_creates_transfer_event_and_pinned_memory(mock_torch, | ||
mock_torch_npu): | ||
"""Test that initialization creates transfer event and pinned CPU memory.""" | ||
# This is a simplified test focusing only on the new attributes | ||
# We mock the entire __init__ process and only test the specific lines we added | ||
|
||
# Mock torch.empty to return a mock tensor | ||
mock_pinned_tensor = MagicMock() | ||
mock_torch.empty.return_value = mock_pinned_tensor | ||
|
||
# Mock torch_npu.npu.Event - 需要设置嵌套的 mock 结构 | ||
mock_event = MagicMock() | ||
mock_torch_npu.npu.Event.return_value = mock_event | ||
|
||
# Create a runner instance using __new__ to bypass __init__ | ||
runner = NPUModelRunner.__new__(NPUModelRunner) | ||
|
||
# Manually set the attributes we need for our test | ||
runner.max_model_len = 2048 | ||
|
||
# Test the specific lines from the commit | ||
runner.transfer_event = mock_torch_npu.npu.Event() | ||
runner.sampled_token_ids_pinned_cpu = mock_torch.empty( | ||
(runner.max_model_len, 1), | ||
dtype=torch.int64, | ||
device="cpu", | ||
pin_memory=True) | ||
|
||
# Verify max_model_len is set | ||
assert runner.max_model_len == 2048 | ||
|
||
# Verify transfer_event is created | ||
assert runner.transfer_event == mock_event | ||
mock_torch_npu.npu.Event.assert_called_once() | ||
|
||
# Verify pinned CPU memory is created with correct parameters | ||
assert runner.sampled_token_ids_pinned_cpu == mock_pinned_tensor | ||
mock_torch.empty.assert_called_with((2048, 1), | ||
dtype=torch.int64, | ||
device="cpu", | ||
pin_memory=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is fragile because it bypasses __init__
and duplicates the implementation logic for creating transfer_event
and sampled_token_ids_pinned_cpu
within the test body. This makes the test hard to maintain, as changes in __init__
might not be reflected here, leading to the test passing while the actual code is broken, or vice-versa.
A better approach is to test the behavior of __init__
by calling it and asserting the results, while mocking its complex dependencies. Alternatively, the logic for initializing these new attributes could be extracted into a separate helper method within NPUModelRunner
, which can then be called and tested directly. This would avoid code duplication and make the test more robust.
For example, you could refactor the NPUModelRunner
like this:
class NPUModelRunner:
def __init__(self, ...):
# ... existing init code ...
self._init_transfer_resources()
def _init_transfer_resources(self):
self.transfer_event = torch_npu.npu.Event()
self.sampled_token_ids_pinned_cpu = torch.empty(
(self.max_num_reqs, 1),
dtype=torch.int64,
device="cpu",
pin_memory=True)
And the test would become:
@patch('vllm_ascend.worker.model_runner_v1.torch_npu')
@patch('vllm_ascend.worker.model_runner_v1.torch')
def test_init_transfer_resources(mock_torch, mock_torch_npu):
# ... mock setup ...
runner = NPUModelRunner.__new__(NPUModelRunner)
runner.max_num_reqs = 64
runner._init_transfer_resources()
mock_torch_npu.npu.Event.assert_called_once()
mock_torch.empty.assert_called_with((64, 1), ...)
# ... other assertions ...
This approach tests the logic without duplicating it.
this change doesn't work with CANN8.3, we're working on it. |
This PR is based on top of vllm-project/vllm#22760
What this PR does / why we need it?
When we copy the sampled valid token ids from device to host, avoid using tolist which would trigger a CUDA wise stream sync if the source is on device. We change it to use non-blocking copy followed by an explicit CUDA event sync.
Does this PR introduce any user-facing change?
How was this patch tested?
Bring up vLLM server
Before:
After
As shown in the figure, the TTFT decreased