Skip to content

Commit fd4ea8e

Browse files
authored
Use NCCL instead of ray for control-plane communication to remove serialization overhead (#2221)
1 parent 1066cbd commit fd4ea8e

34 files changed

+519
-257
lines changed

docs/source/models/adding_model.rst

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,10 @@ Next, you need to rewrite the :code:`forward` methods of your model by following
5858
+ positions: torch.Tensor,
5959
+ kv_caches: List[KVCache],
6060
+ input_metadata: InputMetadata,
61-
+ cache_events: Optional[List[torch.cuda.Event]],
62-
+) -> SamplerOutput:
61+
+) -> Optional[SamplerOutput]:
6362
64-
3. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors.
65-
4. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture.
63+
1. Update the code by considering that :code:`input_ids` and :code:`positions` are now flattened tensors.
64+
2. Replace the attention operation with either :code:`PagedAttention`, :code:`PagedAttentionWithRoPE`, or :code:`PagedAttentionWithALiBi` depending on the model's architecture.
6665

6766
.. note::
6867
Currently, vLLM supports the basic multi-head attention mechanism and its variant with rotary positional embeddings.

requirements-rocm.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ typing-extensions>=4.8.0
33
starlette
44
psutil
55
ray >= 2.5.1
6-
pandas # Required for Ray data.
7-
pyarrow # Required for Ray data.
86
sentencepiece # Required for LLaMA tokenizer.
97
numpy
108
tokenizers>=0.15.0

requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
ninja # For faster builds.
22
psutil
33
ray >= 2.5.1
4-
pandas # Required for Ray data.
5-
pyarrow # Required for Ray data.
64
sentencepiece # Required for LLaMA tokenizer.
75
numpy
86
torch == 2.1.2

tests/async_engine/test_api_server.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@
88
import requests
99

1010

11-
def _query_server(prompt: str) -> dict:
11+
def _query_server(prompt: str, max_tokens: int = 5) -> dict:
1212
response = requests.post("http://localhost:8000/generate",
1313
json={
1414
"prompt": prompt,
15-
"max_tokens": 100,
15+
"max_tokens": max_tokens,
1616
"temperature": 0,
1717
"ignore_eos": True
1818
})
1919
response.raise_for_status()
2020
return response.json()
2121

2222

23+
def _query_server_long(prompt: str) -> dict:
24+
return _query_server(prompt, max_tokens=500)
25+
26+
2327
@pytest.fixture
2428
def api_server():
2529
script_path = Path(__file__).parent.joinpath(
@@ -68,10 +72,11 @@ def test_api_server(api_server):
6872
for result in pool.map(_query_server, prompts):
6973
assert result
7074

75+
with Pool(32) as pool:
7176
# Cancel requests
7277
prompts = ["canceled requests"] * 100
73-
pool.map_async(_query_server, prompts)
74-
time.sleep(0.001)
78+
pool.map_async(_query_server_long, prompts)
79+
time.sleep(0.01)
7580
pool.terminate()
7681
pool.join()
7782

tests/kernels/test_cache.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ def test_copy_blocks(
4949
src_blocks = random.sample(range(num_blocks), num_mappings)
5050
remainig_blocks = list(set(range(num_blocks)) - set(src_blocks))
5151
dst_blocks = random.sample(remainig_blocks, 2 * num_mappings)
52-
block_mapping = {}
52+
copy_src = []
53+
copy_dst = []
5354
for i in range(num_mappings):
54-
src = src_blocks[i]
55-
dst1 = dst_blocks[2 * i]
56-
dst2 = dst_blocks[2 * i + 1]
57-
block_mapping[src] = [dst1, dst2]
55+
copy_src.append(src_blocks[i])
56+
copy_dst.append(dst_blocks[2 * i])
57+
copy_src.append(src_blocks[i])
58+
copy_dst.append(dst_blocks[2 * i + 1])
5859

5960
# Create the KV caches.
6061
key_caches, value_caches = kv_cache_factory(num_blocks, block_size,
@@ -66,15 +67,14 @@ def test_copy_blocks(
6667
cloned_value_caches = [value_cache.clone() for value_cache in value_caches]
6768

6869
# Call the copy blocks kernel.
69-
cache_ops.copy_blocks(key_caches, value_caches, block_mapping)
70+
cache_ops.copy_blocks(key_caches, value_caches, copy_src, copy_dst)
7071

7172
# Run the reference implementation.
72-
for src, dsts in block_mapping.items():
73-
for dst in dsts:
74-
for cloned_key_cache in cloned_key_caches:
75-
cloned_key_cache[dst].copy_(cloned_key_cache[src])
76-
for cloned_value_cache in cloned_value_caches:
77-
cloned_value_cache[dst].copy_(cloned_value_cache[src])
73+
for src, dst in zip(copy_src, copy_dst):
74+
for cloned_key_cache in cloned_key_caches:
75+
cloned_key_cache[dst].copy_(cloned_key_cache[src])
76+
for cloned_value_cache in cloned_value_caches:
77+
cloned_value_cache[dst].copy_(cloned_value_cache[src])
7878

7979
# Compare the results.
8080
for key_cache, cloned_key_cache in zip(key_caches, cloned_key_caches):

tests/worker/test_model_runner.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ def test_prepare_prompt():
3333
expected_selected_token_indices.append(selected_token_start_idx +
3434
prompt_len - 1)
3535
selected_token_start_idx += max_seq_len
36-
input_tokens, input_positions, _ = model_runner._prepare_prompt(
37-
seq_group_metadata_list)
36+
input_tokens, input_positions, _, return_prompt_lens = (
37+
model_runner._prepare_prompt(seq_group_metadata_list))
38+
assert return_prompt_lens == prompt_lens
3839
sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list,
3940
prompt_lens)
4041
assert input_tokens.shape == (batch_size, max_seq_len)

vllm/engine/async_llm_engine.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -185,45 +185,51 @@ async def step_async(self) -> List[RequestOutput]:
185185
"""
186186
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
187187

188-
# Execute the model.
189-
output = (await self._run_workers_async(
190-
"execute_model",
191-
seq_group_metadata_list=seq_group_metadata_list,
192-
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
193-
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
194-
blocks_to_copy=scheduler_outputs.blocks_to_copy,
195-
)) if not scheduler_outputs.is_empty() else []
188+
if not scheduler_outputs.is_empty():
189+
# Execute the model.
190+
all_outputs = await self._run_workers_async(
191+
"execute_model",
192+
driver_kwargs={
193+
"seq_group_metadata_list": seq_group_metadata_list,
194+
"blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
195+
"blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
196+
"blocks_to_copy": scheduler_outputs.blocks_to_copy,
197+
})
198+
199+
# Only the driver worker returns the sampling results.
200+
output = all_outputs[0]
201+
else:
202+
output = []
196203

197204
return self._process_model_outputs(output, scheduler_outputs)
198205

199206
async def _run_workers_async(
200207
self,
201208
method: str,
202209
*args,
203-
get_all_outputs: bool = False,
210+
driver_args: Optional[List[Any]] = None,
211+
driver_kwargs: Optional[Dict[str, Any]] = None,
204212
**kwargs,
205213
) -> Any:
206214
"""Runs the given method on all workers."""
207215
coros = []
208-
for worker in self.workers:
209-
if self.parallel_config.worker_use_ray:
210-
coros.append(
211-
worker.execute_method.remote(method, *args, **kwargs))
212-
else:
213-
executor = getattr(worker, method)
214-
coros.append(asyncio.get_event_loop().run_in_executor(
215-
None, partial(executor, *args, **kwargs)))
216216

217-
all_outputs = await asyncio.gather(*coros)
217+
if driver_args is None:
218+
driver_args = args
219+
if driver_kwargs is None:
220+
driver_kwargs = kwargs
218221

219-
if get_all_outputs:
220-
return all_outputs
222+
# Run the driver worker asynchronously.
223+
driver_executor = getattr(self.driver_worker, method)
224+
coros.append(asyncio.get_event_loop().run_in_executor(
225+
None, partial(driver_executor, *driver_args, **driver_kwargs)))
221226

222-
# Make sure all workers have the same results.
223-
output = all_outputs[0]
224-
for other_output in all_outputs[1:]:
225-
assert output == other_output
226-
return output
227+
# Run the ray workers asynchronously.
228+
for worker in self.workers:
229+
coros.append(worker.execute_method.remote(method, *args, **kwargs))
230+
231+
all_outputs = await asyncio.gather(*coros)
232+
return all_outputs
227233

228234

229235
class AsyncLLMEngine:
@@ -488,13 +494,12 @@ def from_engine_args(cls,
488494
engine_configs = engine_args.create_engine_configs()
489495
parallel_config = engine_configs[2]
490496
# Initialize the cluster.
491-
distributed_init_method, placement_group = initialize_cluster(
492-
parallel_config, engine_args.engine_use_ray)
497+
placement_group = initialize_cluster(parallel_config,
498+
engine_args.engine_use_ray)
493499
# Create the async LLM engine.
494500
engine = cls(parallel_config.worker_use_ray,
495501
engine_args.engine_use_ray,
496502
*engine_configs,
497-
distributed_init_method,
498503
placement_group,
499504
log_requests=not engine_args.disable_log_requests,
500505
log_stats=not engine_args.disable_log_stats,

0 commit comments

Comments
 (0)