Skip to content

Commit 1aaa6fc

Browse files
committed
add dp aware padding
Signed-off-by: Wuxun Zhang <[email protected]>
1 parent 8d7d616 commit 1aaa6fc

File tree

2 files changed

+63
-44
lines changed

2 files changed

+63
-44
lines changed

examples/data_parallel.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import os
3333
from time import sleep
34+
import torch
3435

3536
from vllm import LLM, SamplingParams
3637
from vllm.utils import get_open_port
@@ -89,6 +90,20 @@ def parse_args():
8990
return parser.parse_args()
9091

9192

93+
def generate_random_token_ids(repeat=1) -> list[int]:
94+
"""
95+
For testing different seuquence length in data parallel scenario
96+
"""
97+
candidate_lens = [230, 560]
98+
prompts = []
99+
for num_tokens in candidate_lens:
100+
tokens = torch.randint(
101+
low=0, high=10000, size=(num_tokens,), dtype=torch.int32
102+
)
103+
[prompts.append(tokens.tolist()) for _ in range(repeat)]
104+
return prompts
105+
106+
92107
def main(
93108
model,
94109
dp_size,
@@ -111,13 +126,8 @@ def main(
111126
# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
112127
# engine processes.
113128

114-
# Sample prompts.
115-
prompts = [
116-
"Hello, my name is",
117-
"The president of the United States is",
118-
"The capital of France is",
119-
"The future of AI is",
120-
] * 100
129+
# generate prompts with different length to demonstrate DP aware padding.
130+
prompts = generate_random_token_ids()
121131

122132
# with DP, each rank should process different prompts.
123133
# usually all the DP ranks process a full dataset,
@@ -134,7 +144,6 @@ def start(rank):
134144
# if any rank has no prompts to process,
135145
# we need to set a placeholder prompt
136146
prompts = ["Placeholder"]
137-
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")
138147

139148
# Create a sampling params object.
140149
# since we are doing data parallel, every rank can have different
@@ -154,7 +163,7 @@ def start(rank):
154163
max_num_seqs=max_num_seqs,
155164
gpu_memory_utilization=gpu_memory_utilization,
156165
)
157-
outputs = llm.generate(prompts, sampling_params)
166+
outputs = llm.generate(None, sampling_params, prompts)
158167
# Print the outputs.
159168
for i, output in enumerate(outputs):
160169
if i >= 5:

vllm_gaudi/v1/worker/hpu_model_runner.py

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,6 +1150,12 @@ def _form_prefill_batch(self, contents):
11501150

11511151
target_bs, target_seq, target_blocks = self._get_prompt_bucketing_fn()(
11521152
query_lens, num_context_blocks)
1153+
1154+
# dp aware padding
1155+
target_bs = self.get_dp_padding(target_bs)
1156+
target_seq = self.get_dp_padding(target_seq)
1157+
target_blocks = self.get_dp_padding(target_blocks)
1158+
11531159
token_ids = self._align_and_pad(contents.token_ids,
11541160
(target_bs, target_seq),
11551161
itertools.repeat(-1))
@@ -1266,6 +1272,9 @@ def _prepare_decode_inputs(self, num_decodes,
12661272
padded_batch_size = self.bucketing_manager.find_decode_bucket(
12671273
num_decodes, sum(num_blocks))[0]
12681274

1275+
# dp aware padding
1276+
padded_batch_size = self.get_dp_padding(padded_batch_size)
1277+
12691278
block_tables_list = []
12701279
for i, n in enumerate(num_blocks):
12711280
seq_block_table = block_table_cpu_tensor[i, :n].tolist()
@@ -1365,8 +1374,6 @@ def _prepare_inputs(
13651374
total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
13661375
assert total_num_scheduled_tokens > 0
13671376

1368-
# TODO wuxun: consider dp aware padding for bs, block bucket, etc.
1369-
13701377
num_reqs = num_prefills + num_decodes
13711378

13721379
# Get the number of scheduled tokens for each request.
@@ -1406,7 +1413,6 @@ def _check_config(self, batch_size, seq_len, num_blocks, attn_metadata,
14061413
"Configuration: (%s, %s, %s, %s) was not warmed-up!", phase,
14071414
batch_size, seq_len, num_blocks)
14081415

1409-
# TODO wuxun: dp padding for prefill/decode inputs
14101416
def get_dp_padding(self,
14111417
num_tokens: int) -> tuple[int, Optional[torch.Tensor]]:
14121418
dp_size = self.vllm_config.parallel_config.data_parallel_size
@@ -1426,11 +1432,11 @@ def get_dp_padding(self,
14261432
num_tokens_across_dp = DPMetadata.num_tokens_across_dp(
14271433
num_tokens, dp_size, dp_rank)
14281434
max_tokens_across_dp_cpu = torch.max(num_tokens_across_dp).item()
1429-
num_tokens_after_padding = torch.tensor([max_tokens_across_dp_cpu] *
1430-
dp_size,
1431-
device="cpu",
1432-
dtype=torch.int32)
1433-
return max_tokens_across_dp_cpu - num_tokens, num_tokens_after_padding
1435+
# num_tokens_after_padding = torch.tensor([max_tokens_across_dp_cpu] *
1436+
# dp_size,
1437+
# device="cpu",
1438+
# dtype=torch.int32).item()
1439+
return max_tokens_across_dp_cpu
14341440

14351441
def _execute_model_generic(self,
14361442
token_ids,
@@ -2481,36 +2487,40 @@ def profile_run(self) -> None:
24812487
# it is important to create tensors inside the loop, rather than
24822488
# multiplying the list, to avoid Dynamo from treating them as
24832489
# tensor aliasing.
2484-
num_layers = self.model_config.get_num_layers(self.parallel_config)
2485-
kv_caches = [None] * num_layers
2486-
2487-
# Run empty prefill forwards - prefill max batch and prefill max seq
2488-
self.warmup_scenario(batch_size=1,
2489-
seq_or_block=self.max_model_len,
2490-
is_prompt=True,
2491-
kv_caches=kv_caches)
2492-
max_seq_len = math.ceil(
2493-
(self.max_num_tokens // self.max_prefill_batch_size) /
2494-
self.block_size) * self.block_size
2495-
self.warmup_scenario(batch_size=self.max_prefill_batch_size,
2496-
seq_or_block=max_seq_len,
2497-
is_prompt=True,
2498-
kv_caches=kv_caches)
2490+
# num_layers = self.model_config.get_num_layers(self.parallel_config)
2491+
# kv_caches = [None] * num_layers
2492+
2493+
max_num_batched_tokens = self.max_num_tokens
2494+
max_prefill_batch_size = self.max_prefill_batch_size
2495+
max_seq_len = (max_num_batched_tokens + max_prefill_batch_size -
2496+
1) // max_prefill_batch_size
2497+
if max_seq_len % self.block_size != 0:
2498+
max_seq_len = ((max_seq_len + self.block_size - 1) //
2499+
self.block_size) * self.block_size
2500+
2501+
prompt_cfg = (max_prefill_batch_size, max_seq_len, 0)
2502+
decode_cfg = None
2503+
2504+
self._execute_dummy_scenario(prompt_cfg, decode_cfg)
2505+
2506+
# # Run empty prefill forwards - prefill max batch and prefill max seq
2507+
# self.warmup_scenario(batch_size=1,
2508+
# seq_or_block=self.max_model_len,
2509+
# is_prompt=True,
2510+
# kv_caches=kv_caches)
2511+
# max_seq_len = math.ceil(
2512+
# (self.max_num_tokens // self.max_prefill_batch_size) /
2513+
# self.block_size) * self.block_size
2514+
# self.warmup_scenario(batch_size=self.max_prefill_batch_size,
2515+
# seq_or_block=max_seq_len,
2516+
# is_prompt=True,
2517+
# kv_caches=kv_caches)
24992518

25002519
def _dummy_run(self, max_num_batched_tokens: int) -> None:
2501-
# TODO wuxun: dummy run implementation
25022520
assert max_num_batched_tokens == 1
2503-
# self.warmup_scenario(max_num_batched_tokens,
2504-
# 1,
2505-
# 1,
2506-
# is_prompt=False,
2507-
# kv_caches=None,
2508-
# num_iters=1,
2509-
# is_pt_profiler_run=False,
2510-
# align_worker=True,
2511-
# is_dummy_run=True)
2512-
prompt_cfg = 1, 1, 0
2513-
decode_cfg = None
2521+
prompt_cfg = None
2522+
decode_cfg = 1, 1
2523+
# add dummy decode run
25142524
self._execute_dummy_scenario(prompt_cfg, decode_cfg)
25152525
return
25162526

0 commit comments

Comments
 (0)