-
-
Notifications
You must be signed in to change notification settings - Fork 9.8k
v1: Offloading connector #22595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
v1: Offloading connector #22595
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This PR introduces a new offloading connector. The implementation is extensive and adds a lot of new components. My review found several critical issues that need to be addressed. These include a race condition in the tests, a critical assertion that would crash workers on transfer failures, a resource leak due to unjoined threads, and an incorrect list slicing that would lead to errors. These issues affect both the correctness of the new feature and the reliability of its tests.
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
4b24d03
to
4fca175
Compare
mark, will take a look and review after this PR gets stable. |
4fca175
to
8d7a0d7
Compare
866a51c
to
4872976
Compare
This commit adds a new offloading component, composed of: 1. A scheduler side OffloadingManager (abstract) which kicks-off KV data transfers and keeps track of offloaded data. 2. A worker side OffloadingQueueManager which asynchronously manages KV transfers. Signed-off-by: Or Ozeri <[email protected]>
This pull request has merge conflicts that must be resolved before it can be |
4872976
to
9d2e0b9
Compare
This commit move the request block hashes from the KVCacheManager to the Request object itself. In particular, this will allow connectors to access the request block hashes. Signed-off-by: Or Ozeri <[email protected]>
This commit adds a new scheduler-side connector API to collect KV cache events. Additionally, we add a medium field to KV events, to allow distinguishing KV events on different mediums (e.g. blocks stored on cpu, disk, or gpu (default)). Signed-off-by: Or Ozeri <[email protected]>
This commit introduces a new OffloadingConnector for offloading blocks of KV data via a generic interface. Signed-off-by: Or Ozeri <[email protected]>
9d2e0b9
to
11e1629
Compare
@orozery Hey, thanks for the amazing work! Is there a centralized branch for us to run some benchmarks? We are excited to test it and would like to push for landing this connector-based CPU offloading solution if it has good performance 🚀. |
try this branch, https://github.com/orozery/vllm/tree/cpu-offloading-afa5b7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@orozery Thanks for the great effort! Some high-level comments:
- The current implementation is a bit over-complicated. We should simplify the
transfer_fn
and theLoadStoreSpec
abstraction in order to get better performance and better maintainability. - There are a few potential performance improvements that we can do (immediately or as potential follow-ups)
(a) Launch the d2h/h2d copy kernels in a separate cuda stream
(b) Use cuda events to implement the async loading so that we don't need to launch extra python threads in the worker process.
@dataclass | ||
class PrepareStoreOutput: | ||
block_hashes_to_store: list[int] | ||
store_specs: list[LoadStoreSpec] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the store_specs
being a list of Python objects will be pretty heavy.
From other related PRs, I see that we are going to transmit this list between processes and threads, plus doing some for loops over it in the worker process. This can incur a huge amount of python-level overheads.
A proposal is to use torch.Tensor for now, since the BlockIDLoadStoreSpec
are just wrapping around integers.
class BlockIDLoadStoreSpec(LoadStoreSpec, ABC): | ||
""" | ||
Spec for loading/storing a KV block from a given block number. | ||
""" | ||
|
||
def __init__(self, block_id: int): | ||
self.block_id = block_id | ||
|
||
def __repr__(self) -> str: | ||
return str(self.block_id) | ||
|
||
|
||
class GPULoadStoreSpec(BlockIDLoadStoreSpec): | ||
""" | ||
Spec for loading/storing a KV block to GPU memory. | ||
""" | ||
|
||
@staticmethod | ||
def medium() -> str: | ||
return "GPU" | ||
|
||
|
||
class CPULoadStoreSpec(BlockIDLoadStoreSpec): | ||
""" | ||
Spec for loading/storing a KV block to CPU memory. | ||
""" | ||
|
||
@staticmethod | ||
def medium() -> str: | ||
return "CPU" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstraction of such LoadStoreSpec seems to be over-complicated. Why should we have this? Would there be simpler alternatives? (i.e., just use two lists or two tensors for cpu->gpu block ids and gpu->cpu block ids)
@abstractmethod | ||
def get_transfer_functions( | ||
self, kv_caches: dict[str, torch.Tensor] | ||
) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], | ||
TransferFunction, int]]: | ||
""" | ||
Get transfer functions along with their respective src and dst types. | ||
|
||
Args: | ||
kv_caches: A dictionary of layer_name -> gpu_kv_cache tensor. | ||
|
||
Yields: | ||
Tuples of (src_type, dst_type, transfer_function, num_threads). | ||
""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure the purpose of having such an abstraction for CPU offloading. The logic is a bit hard to follow here.
Can we directly call swap_blocks
in the connector? That would be simple and easy to understand.
for thread_idx in range(num_threads): | ||
t = threading.Thread(target=self.run, | ||
args=(thread_idx, ), | ||
name=f"{transfer_type}-worker-{thread_idx}") | ||
t.start() | ||
self._worker_threads.append(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having another thread in the worker process may incur extra overhead.
At a high level, we might want to use CUDA events to achieve async so that we don't need to create new threads.
IIUC, this could be a longer discussion, and we can gradually push the implementation in.
job_id) | ||
|
||
try: | ||
success = self.transfer_fn(transfer_spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I saw in other PRs, the transfer_fn and the internal swap_blocks
are launched in the same CUDA stream as the LLM inference.
This will make CPU offloading a blocking operation, resulting in a negative performance impact, especially when there is no KV cache reuse.
For performance's sake, we should make sure the d2h and h2d copies are launched in different cuda streams.,
This PR adds an offloading connector that delegates to a generic API introduced in #19848.
The actual implementation of this API is built using a factory which is currently empty.
A follow-up small PR will register a CPU implementation based on #20075 (scheduler-side implementation) and #21448 (worker-side implementation).
Part of RFC #19854.
Depends on PRs #19728, #19848, #19737.