Skip to content

Commit a15a50f

Browse files
authored
[CPU] Enable shared-memory based pipeline parallel for CPU backend (#21289)
Signed-off-by: jiang1.li <[email protected]>
1 parent 6dda13c commit a15a50f

File tree

8 files changed

+165
-59
lines changed

8 files changed

+165
-59
lines changed

.buildkite/scripts/hardware_ci/run-cpu-test.sh

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ set -ex
66

77
# allow to bind to different cores
88
CORE_RANGE=${CORE_RANGE:-48-95}
9+
# used for TP/PP E2E test
910
OMP_CORE_RANGE=${OMP_CORE_RANGE:-48-95}
1011
NUMA_NODE=${NUMA_NODE:-1}
1112

@@ -24,8 +25,8 @@ numactl -C "$CORE_RANGE" -N "$NUMA_NODE" docker build --tag cpu-test-"$NUMA_NODE
2425
numactl -C "$CORE_RANGE" -N "$NUMA_NODE" docker build --build-arg VLLM_CPU_DISABLE_AVX512="true" --tag cpu-test-"$NUMA_NODE"-avx2 --target vllm-test -f docker/Dockerfile.cpu .
2526

2627
# Run the image, setting --shm-size=4g for tensor parallel.
27-
docker run -itd --cpuset-cpus="$CORE_RANGE" --cpuset-mems="$NUMA_NODE" --entrypoint /bin/bash -v ~/.cache/huggingface:/root/.cache/huggingface --privileged=true -e HF_TOKEN --env VLLM_CPU_KVCACHE_SPACE=4 --env VLLM_CPU_CI_ENV=1 --shm-size=4g --name cpu-test-"$NUMA_NODE" cpu-test-"$NUMA_NODE"
28-
docker run -itd --cpuset-cpus="$CORE_RANGE" --cpuset-mems="$NUMA_NODE" --entrypoint /bin/bash -v ~/.cache/huggingface:/root/.cache/huggingface --privileged=true -e HF_TOKEN --env VLLM_CPU_KVCACHE_SPACE=4 --env VLLM_CPU_CI_ENV=1 --shm-size=4g --name cpu-test-"$NUMA_NODE"-avx2 cpu-test-"$NUMA_NODE"-avx2
28+
docker run -itd --cpuset-cpus="$CORE_RANGE" --cpuset-mems="$NUMA_NODE" --entrypoint /bin/bash -v ~/.cache/huggingface:/root/.cache/huggingface --privileged=true -e HF_TOKEN --env VLLM_CPU_KVCACHE_SPACE=4 --env VLLM_CPU_CI_ENV=1 -e E2E_OMP_THREADS="$OMP_CORE_RANGE" --shm-size=4g --name cpu-test-"$NUMA_NODE" cpu-test-"$NUMA_NODE"
29+
docker run -itd --cpuset-cpus="$CORE_RANGE" --cpuset-mems="$NUMA_NODE" --entrypoint /bin/bash -v ~/.cache/huggingface:/root/.cache/huggingface --privileged=true -e HF_TOKEN --env VLLM_CPU_KVCACHE_SPACE=4 --env VLLM_CPU_CI_ENV=1 -e E2E_OMP_THREADS="$OMP_CORE_RANGE" --shm-size=4g --name cpu-test-"$NUMA_NODE"-avx2 cpu-test-"$NUMA_NODE"-avx2
2930

3031
function cpu_tests() {
3132
set -e
@@ -78,17 +79,16 @@ function cpu_tests() {
7879
# tests/quantization/test_ipex_quant.py"
7980

8081
# online serving
81-
docker exec cpu-test-"$NUMA_NODE" bash -c "
82+
docker exec cpu-test-"$NUMA_NODE" bash -c '
8283
set -e
83-
python3 -m vllm.entrypoints.openai.api_server --model facebook/opt-125m --dtype half &
84-
timeout 600 bash -c 'until curl localhost:8000/v1/models; do sleep 1; done' || exit 1
85-
VLLM_CPU_CI_ENV=0 python3 benchmarks/benchmark_serving.py \
84+
VLLM_CPU_OMP_THREADS_BIND=$E2E_OMP_THREADS VLLM_CPU_SGL_KERNEL=1 vllm serve meta-llama/Llama-3.2-3B-Instruct -tp=2 -pp=2 &
85+
timeout 600 bash -c "until curl localhost:8000/v1/models; do sleep 1; done" || exit 1
86+
python3 benchmarks/benchmark_serving.py \
8687
--backend vllm \
8788
--dataset-name random \
88-
--model facebook/opt-125m \
89+
--model meta-llama/Llama-3.2-3B-Instruct \
8990
--num-prompts 20 \
90-
--endpoint /v1/completions \
91-
--tokenizer facebook/opt-125m"
91+
--endpoint /v1/completions'
9292

9393
# Run multi-lora tests
9494
docker exec cpu-test-"$NUMA_NODE" bash -c "

csrc/cpu/shm.cpp

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace {
99
#define MAX_SHM_RANK_NUM 8
10-
#define PER_THREAD_SHM_BUFFER_BYTES (2 * 1024 * 1024)
10+
#define PER_THREAD_SHM_BUFFER_BYTES (4 * 1024 * 1024)
1111
static_assert(PER_THREAD_SHM_BUFFER_BYTES % 2 == 0);
1212
#define PER_THREAD_SHM_BUFFER_OFFSET (PER_THREAD_SHM_BUFFER_BYTES >> 1)
1313
#define MIN_THREAD_PROCESS_SIZE (256)
@@ -34,9 +34,10 @@ struct KernelVecType<c10::Half> {
3434
};
3535

3636
struct ThreadSHMContext {
37-
volatile char _curr_thread_stamp;
38-
volatile char _ready_thread_stamp;
39-
char _padding1[6];
37+
volatile char _curr_thread_stamp[2];
38+
volatile char _ready_thread_stamp[2];
39+
int local_stamp_buffer_idx;
40+
int remote_stamp_buffer_idx;
4041
int thread_id;
4142
int thread_num;
4243
int rank;
@@ -45,23 +46,28 @@ struct ThreadSHMContext {
4546
int swizzled_ranks[MAX_SHM_RANK_NUM];
4647
void* thread_shm_ptrs[MAX_SHM_RANK_NUM];
4748
ThreadSHMContext* shm_contexts[MAX_SHM_RANK_NUM];
48-
size_t _thread_buffer_mask;
49-
char _padding2[56];
49+
size_t _thread_buffer_mask[2];
50+
char _padding2[40];
5051

5152
ThreadSHMContext(const int thread_id, const int thread_num, const int rank,
5253
const int group_size, void* thread_shm_ptr)
53-
: _curr_thread_stamp(1),
54-
_ready_thread_stamp(0),
54+
: local_stamp_buffer_idx(0),
55+
remote_stamp_buffer_idx(0),
5556
thread_id(thread_id),
5657
thread_num(thread_num),
5758
rank(rank),
5859
group_size(group_size),
59-
_spinning_count(0),
60-
_thread_buffer_mask(0) {
60+
_spinning_count(0) {
6161
static_assert(sizeof(ThreadSHMContext) % 64 == 0);
6262
TORCH_CHECK(group_size <= MAX_SHM_RANK_NUM);
6363
TORCH_CHECK((size_t)this % 64 == 0);
6464
TORCH_CHECK((size_t)thread_shm_ptr % 64 == 0);
65+
_curr_thread_stamp[0] = 1;
66+
_curr_thread_stamp[1] = 1;
67+
_ready_thread_stamp[0] = 0;
68+
_ready_thread_stamp[1] = 0;
69+
_thread_buffer_mask[0] = 0;
70+
_thread_buffer_mask[1] = 0;
6571
for (int i = 0; i < MAX_SHM_RANK_NUM; ++i) {
6672
shm_contexts[i] = nullptr;
6773
thread_shm_ptrs[i] = nullptr;
@@ -70,6 +76,11 @@ struct ThreadSHMContext {
7076
set_context(rank, this, thread_shm_ptr);
7177
}
7278

79+
void set_stamp_buffer_idx(int local, int remote) {
80+
local_stamp_buffer_idx = local;
81+
remote_stamp_buffer_idx = remote;
82+
}
83+
7384
void set_context(int rank, ThreadSHMContext* ptr, void* thread_shm_ptr) {
7485
TORCH_CHECK(rank < MAX_SHM_RANK_NUM);
7586
TORCH_CHECK(ptr);
@@ -84,23 +95,27 @@ struct ThreadSHMContext {
8495
T* get_thread_shm_ptr(int rank) {
8596
return reinterpret_cast<T*>(
8697
reinterpret_cast<int8_t*>(thread_shm_ptrs[rank]) +
87-
(PER_THREAD_SHM_BUFFER_OFFSET & _thread_buffer_mask));
98+
(PER_THREAD_SHM_BUFFER_OFFSET &
99+
_thread_buffer_mask[local_stamp_buffer_idx]));
88100
}
89101

90-
void next_buffer() { _thread_buffer_mask ^= 0xFFFFFFFFFFFFFFFF; }
102+
void next_buffer() {
103+
_thread_buffer_mask[local_stamp_buffer_idx] ^= 0xFFFFFFFFFFFFFFFF;
104+
}
91105

92-
char get_curr_stamp() const { return _curr_thread_stamp; }
106+
char get_curr_stamp(int idx) const { return _curr_thread_stamp[idx]; }
93107

94-
char get_ready_stamp() const { return _ready_thread_stamp; }
108+
char get_ready_stamp(int idx) const { return _ready_thread_stamp[idx]; }
95109

96110
void next_stamp() {
97111
_mm_mfence();
98-
_curr_thread_stamp += 1;
112+
_curr_thread_stamp[local_stamp_buffer_idx] += 1;
99113
}
100114

101115
void commit_ready_stamp() {
102116
_mm_mfence();
103-
_ready_thread_stamp = _curr_thread_stamp;
117+
_ready_thread_stamp[local_stamp_buffer_idx] =
118+
_curr_thread_stamp[local_stamp_buffer_idx];
104119
}
105120

106121
int get_swizzled_rank(int idx) { return swizzled_ranks[idx]; }
@@ -117,10 +132,11 @@ struct ThreadSHMContext {
117132
void wait_for_one(int rank, Cond&& cond) {
118133
ThreadSHMContext* rank_ctx = shm_contexts[rank];
119134
for (;;) {
120-
char local_curr_stamp = get_curr_stamp();
121-
char local_ready_stamp = get_ready_stamp();
122-
char rank_curr_stamp = rank_ctx->get_curr_stamp();
123-
char rank_ready_stamp = rank_ctx->get_ready_stamp();
135+
char local_curr_stamp = get_curr_stamp(local_stamp_buffer_idx);
136+
char local_ready_stamp = get_ready_stamp(local_stamp_buffer_idx);
137+
char rank_curr_stamp = rank_ctx->get_curr_stamp(remote_stamp_buffer_idx);
138+
char rank_ready_stamp =
139+
rank_ctx->get_ready_stamp(remote_stamp_buffer_idx);
124140
if (cond(local_curr_stamp, local_ready_stamp, rank_curr_stamp,
125141
rank_ready_stamp)) {
126142
break;
@@ -361,6 +377,15 @@ void shm_cc_loop(ThreadSHMContext* ctx, int64_t elem_num, F&& inner_func) {
361377
}
362378
}
363379
}
380+
381+
void reset_threads_stamp_buffer_idx(ThreadSHMContext* ctx, int local,
382+
int remote) {
383+
int thread_num = ctx->thread_num;
384+
for (int i = 0; i < thread_num; ++i) {
385+
ThreadSHMContext* thread_ctx = ctx + i;
386+
thread_ctx->set_stamp_buffer_idx(local, remote);
387+
}
388+
}
364389
}; // namespace shm_cc_ops
365390

366391
namespace shm_cc_ops {
@@ -632,6 +657,7 @@ void shm_send_tensor_list_impl(ThreadSHMContext* ctx, int64_t dst,
632657
TensorListMeta* metadata = new (metadata_tensor.data_ptr()) TensorListMeta();
633658
metadata->bind_tensor_list(tensor_list_with_metadata);
634659

660+
shm_cc_ops::reset_threads_stamp_buffer_idx(ctx, 0, 1);
635661
shm_cc_ops::shm_cc_loop<int8_t>(
636662
ctx, metadata->total_bytes,
637663
[&](ThreadSHMContext* thread_ctx, int64_t data_offset,
@@ -659,6 +685,7 @@ std::vector<torch::Tensor> shm_recv_tensor_list_impl(ThreadSHMContext* ctx,
659685
torch::Tensor metadata_tensor =
660686
torch::empty({sizeof(TensorListMeta)}, options);
661687

688+
shm_cc_ops::reset_threads_stamp_buffer_idx(ctx, 1, 0);
662689
ctx->wait_for_one(src, ThreadSHMContext::check_stamp_ready);
663690
shm_cc_ops::memcpy(metadata_tensor.data_ptr(),
664691
ctx->get_thread_shm_ptr<void>(src),
@@ -677,7 +704,7 @@ std::vector<torch::Tensor> shm_recv_tensor_list_impl(ThreadSHMContext* ctx,
677704
ctx, metadata.total_bytes,
678705
[&](ThreadSHMContext* thread_ctx, int64_t data_offset,
679706
int64_t data_elem_num, bool fast_mode) {
680-
ctx->wait_for_one(src, ThreadSHMContext::check_stamp_ready);
707+
thread_ctx->wait_for_one(src, ThreadSHMContext::check_stamp_ready);
681708
int64_t curr_shm_offset = 0;
682709
while (curr_shm_offset < data_elem_num) {
683710
MemPiece frag = metadata.get_data(data_offset + curr_shm_offset);

docs/getting_started/installation/cpu.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,20 @@ Note, it is recommended to manually reserve 1 CPU for vLLM front-end process whe
166166

167167
- This value is 4GB by default. Larger space can support more concurrent requests, longer context length. However, users should take care of memory capacity of each NUMA node. The memory usage of each TP rank is the sum of `weight shard size` and `VLLM_CPU_KVCACHE_SPACE`, if it exceeds the capacity of a single NUMA node, the TP worker will be killed with `exitcode 9` due to out-of-memory.
168168

169+
### How to do performance tuning for vLLM CPU?
170+
171+
- First of all, please make sure the thread-binding and KV cache space are properly set and take effect. You can check the thread-binding by running a vLLM benchmark and observing CPU cores usage via `htop`.
172+
173+
- Inference batch size is a important parameter for the performance. Larger batch usually provides higher throughput, smaller batch provides lower latency. Tuning max batch size starts from default value to balance throughput and latency is an effective way to improve vLLM CPU performance on specific platforms. There are two important related parameters in vLLM:
174+
- `--max-num-batched-tokens`, defines the limit of token numbers in a single batch, has more impacts on the first token performance. The default value is set as:
175+
- Offline Inference: `4096 * world_size`
176+
- Online Serving: `2048 * world_size`
177+
- `--max-num-seqs`, defines the limit of sequence numbers in a single batch, has more impacts on the output token performance.
178+
- Offline Inference: `256 * world_size`
179+
- Online Serving: `128 * world_size`
180+
181+
- vLLM CPU supports tensor parallel (TP) and pipeline parallel (PP) to leverage multiple CPU sockets and memory nodes. For more detials of tuning TP and PP, please refer to [Optimization and Tuning](../../configuration/optimization.md). For vLLM CPU, it is recommend to use TP and PP togther if there are enough CPU sockets and memory nodes.
182+
169183
### Which quantization configs does vLLM CPU support?
170184

171185
- vLLM CPU supports quantizations:

vllm/distributed/device_communicators/cpu_communicator.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33

44
import os
5-
from typing import Optional
5+
from typing import Any, Optional, Union
66

77
import torch
88
from torch.distributed import ProcessGroup
99

10+
from vllm.distributed.utils import pickle
1011
from vllm.platforms import current_platform
1112
from vllm.platforms.interface import CpuArchEnum
1213

@@ -26,7 +27,8 @@ def __init__(self,
2627
if (current_platform.get_cpu_architecture()
2728
== CpuArchEnum.X86) and hasattr(
2829
torch.ops._C,
29-
"init_shm_manager") and unique_name.startswith("tp"):
30+
"init_shm_manager") and (unique_name.startswith("tp")
31+
or unique_name.startswith("pp")):
3032
self.dist_module = _CPUSHMDistributed(self)
3133

3234
def all_reduce(self, input_):
@@ -94,6 +96,19 @@ def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
9496
input_size[dim + 1:])
9597
return output_tensor
9698

99+
def send_tensor_dict(
100+
self,
101+
tensor_dict: dict[str, Union[torch.Tensor, Any]],
102+
dst: int,
103+
) -> None:
104+
return self.dist_module.send_tensor_dict(tensor_dict, dst)
105+
106+
def recv_tensor_dict(
107+
self,
108+
src: int,
109+
) -> dict[str, Union[torch.Tensor, Any]]:
110+
return self.dist_module.recv_tensor_dict(src)
111+
97112

98113
class _CPUSHMDistributed:
99114

@@ -143,3 +158,44 @@ def all_gather_into_tensor(self,
143158
input: torch.Tensor,
144159
group: Optional[ProcessGroup] = None) -> None:
145160
torch.ops._C.shm_all_gather(self.handle, input, output)
161+
162+
def send_tensor_dict(
163+
self,
164+
tensor_dict: dict[str, Union[torch.Tensor, Any]],
165+
dst: int,
166+
) -> None:
167+
key_list = list(tensor_dict.keys())
168+
value_list = list(tensor_dict.values())
169+
size_list = []
170+
for v in value_list:
171+
if not isinstance(v, torch.Tensor):
172+
raise RuntimeError(
173+
"CpuCommunicator only supports sending tensors.")
174+
size_list.append(v.size())
175+
key_size_tensor = torch.frombuffer(pickle.dumps([key_list, size_list]),
176+
dtype=torch.uint8)
177+
value_list.append(key_size_tensor)
178+
179+
torch.ops._C.shm_send_tensor_list(self.handle, value_list, dst)
180+
181+
return None
182+
183+
def recv_tensor_dict(
184+
self,
185+
src: int,
186+
) -> dict[str, Union[torch.Tensor, Any]]:
187+
tensor_list = torch.ops._C.shm_recv_tensor_list(self.handle, src)
188+
189+
value_list: list[torch.Tensor] = tensor_list[:-1]
190+
key_size_tensor = tensor_list[-1]
191+
192+
key_size = pickle.loads(key_size_tensor.numpy().tobytes())
193+
key_list = key_size[0]
194+
size_list = key_size[1]
195+
assert len(key_list) == len(size_list)
196+
assert len(key_list) == len(value_list)
197+
198+
tensor_dict: dict[str, torch.Tensor] = {}
199+
for key, size, t in zip(key_list, size_list, value_list):
200+
tensor_dict[key] = t.view(size)
201+
return tensor_dict

vllm/distributed/parallel_state.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ def __init__(
272272
self.use_custom_op_call = (current_platform.is_cuda_alike()
273273
or current_platform.is_tpu())
274274

275+
self.use_cpu_custom_send_recv = (current_platform.is_cpu() and hasattr(
276+
torch.ops._C, "init_shm_manager"))
277+
275278
@property
276279
def first_rank(self):
277280
"""Return the global rank of the first process in the group"""
@@ -663,6 +666,11 @@ def send_tensor_dict(
663666
dst = (self.rank_in_group + 1) % self.world_size
664667
assert dst < self.world_size, f"Invalid dst rank ({dst})"
665668

669+
if self.use_cpu_custom_send_recv:
670+
self.device_communicator.send_tensor_dict( # type: ignore
671+
tensor_dict, dst)
672+
return None
673+
666674
metadata_list: list[tuple[Any, Any]] = []
667675
assert isinstance(
668676
tensor_dict,
@@ -718,6 +726,10 @@ def recv_tensor_dict(
718726
src = (self.rank_in_group - 1) % self.world_size
719727
assert src < self.world_size, f"Invalid src rank ({src})"
720728

729+
if self.use_cpu_custom_send_recv:
730+
return self.device_communicator.recv_tensor_dict( # type: ignore
731+
src)
732+
721733
recv_metadata_list = self.recv_object(src=src)
722734
tensor_dict: dict[str, Any] = {}
723735
for key, value in recv_metadata_list:

vllm/engine/arg_utils.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,13 +1639,14 @@ def _set_default_args_v1(self, usage_context: UsageContext,
16391639

16401640
# cpu specific default values.
16411641
if current_platform.is_cpu():
1642+
world_size = self.pipeline_parallel_size * self.tensor_parallel_size
16421643
default_max_num_batched_tokens = {
1643-
UsageContext.LLM_CLASS: 4096,
1644-
UsageContext.OPENAI_API_SERVER: 2048,
1644+
UsageContext.LLM_CLASS: 4096 * world_size,
1645+
UsageContext.OPENAI_API_SERVER: 2048 * world_size,
16451646
}
16461647
default_max_num_seqs = {
1647-
UsageContext.LLM_CLASS: 128,
1648-
UsageContext.OPENAI_API_SERVER: 32,
1648+
UsageContext.LLM_CLASS: 256 * world_size,
1649+
UsageContext.OPENAI_API_SERVER: 128 * world_size,
16491650
}
16501651

16511652
use_context_value = usage_context.value if usage_context else None

vllm/envs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
VLLM_USE_FLASHINFER_SAMPLER: Optional[bool] = None
4343
VLLM_FLASHINFER_FORCE_TENSOR_CORES: bool = False
4444
VLLM_PP_LAYER_PARTITION: Optional[str] = None
45-
VLLM_CPU_KVCACHE_SPACE: int = 0
45+
VLLM_CPU_KVCACHE_SPACE: Optional[int] = 0
4646
VLLM_CPU_OMP_THREADS_BIND: str = ""
4747
VLLM_CPU_NUM_OF_RESERVED_CPU: Optional[int] = None
4848
VLLM_CPU_MOE_PREPACK: bool = True
@@ -430,9 +430,10 @@ def get_vllm_port() -> Optional[int]:
430430
lambda: os.getenv("VLLM_PP_LAYER_PARTITION", None),
431431

432432
# (CPU backend only) CPU key-value cache space.
433-
# default is 4 GiB
433+
# default is None and will be set as 4 GB
434434
"VLLM_CPU_KVCACHE_SPACE":
435-
lambda: int(os.getenv("VLLM_CPU_KVCACHE_SPACE", "0")),
435+
lambda: int(os.getenv("VLLM_CPU_KVCACHE_SPACE", "0"))
436+
if "VLLM_CPU_KVCACHE_SPACE" in os.environ else None,
436437

437438
# (CPU backend only) CPU core ids bound by OpenMP threads, e.g., "0-31",
438439
# "0,1,2", "0-31,33". CPU cores of different ranks are separated by '|'.

0 commit comments

Comments
 (0)