Skip to content
254 changes: 254 additions & 0 deletions examples/data_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Usage:
Single node:
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2

Multi-node:
Node 0 (assume the node has ip of 10.99.48.128):
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=0 \
--master-addr=10.99.48.128 \
--master-port=13345
Node 1:
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=1 \
--master-addr=10.99.48.128 \
--master-port=13345
"""

import os
from time import sleep
import torch

from vllm import LLM, SamplingParams
from vllm.utils import get_open_port


def parse_args():
import argparse

parser = argparse.ArgumentParser(description="Data Parallel Inference")
parser.add_argument(
"--model",
type=str,
default="ibm-research/PowerMoE-3b",
help="Model name or path",
)
parser.add_argument(
"--dp-size", type=int, default=2, help="Data parallel size"
)
parser.add_argument(
"--tp-size", type=int, default=2, help="Tensor parallel size"
)
parser.add_argument(
"--node-size", type=int, default=1, help="Total number of nodes"
)
parser.add_argument(
"--node-rank", type=int, default=0, help="Rank of the current node"
)
parser.add_argument(
"--master-addr", type=str, default="", help="Master node IP address"
)
parser.add_argument(
"--master-port", type=int, default=0, help="Master node port"
)
parser.add_argument(
"--enforce-eager",
action="store_true",
help="Enforce eager mode execution.",
)
parser.add_argument(
"--trust-remote-code", action="store_true", help="Trust remote code."
)
parser.add_argument(
"--max-num-seqs",
type=int,
default=64,
help=(
"Maximum number of sequences to be processed in a single iteration."
),
)
parser.add_argument(
"--gpu-memory-utilization",
type=float,
default=0.8,
help=("Fraction of GPU memory vLLM is allowed to allocate (0.0, 1.0]."),
)
parser.add_argument(
"--random-input",
action="store_true",
help="Use random generated input tokens.",
)
return parser.parse_args()


def generate_random_token_ids(repeat=1) -> list[int]:
"""
For testing different seuquence length in data parallel scenario
Copy link
Preview

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix typo: 'seuquence' should be 'sequence'.

Suggested change
For testing different seuquence length in data parallel scenario
For testing different sequence length in data parallel scenario

Copilot uses AI. Check for mistakes.

"""
candidate_lens = [130, 560]
prompts = []
for num_tokens in candidate_lens:
tokens = torch.randint(
low=0, high=10000, size=(num_tokens,), dtype=torch.int32
)
[prompts.append(tokens.tolist()) for _ in range(repeat)]
return prompts


def main(
model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
GPUs_per_dp_rank,
enforce_eager,
trust_remote_code,
max_num_seqs,
gpu_memory_utilization,
):
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
os.environ["VLLM_DP_SIZE"] = str(dp_size)
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)

# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
# engine processes.

# Sample prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
] * 40

# generate prompts with different length to demonstrate DP aware padding.
if args.random_input:
prompts = generate_random_token_ids(40)

# with DP, each rank should process different prompts.
# usually all the DP ranks process a full dataset,
# and each rank processes a different part of the dataset.
floor = len(prompts) // dp_size
remainder = len(prompts) % dp_size

# Distribute prompts into even groups.
def start(rank):
return rank * floor + min(rank, remainder)

prompts = prompts[start(global_dp_rank) : start(global_dp_rank + 1)]
if len(prompts) == 0:
# if any rank has no prompts to process,
# we need to set a placeholder prompt
prompts = ["Placeholder"]
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")
# Create a sampling params object.
# since we are doing data parallel, every rank can have different
# sampling params. here we set different max_tokens for different
# ranks for demonstration.
sampling_params = SamplingParams(
temperature=0.8, top_p=0.95, max_tokens=[16, 20][global_dp_rank % 2]
)

# Create an LLM.
llm = LLM(
model=model,
tensor_parallel_size=GPUs_per_dp_rank,
enforce_eager=enforce_eager,
enable_expert_parallel=True,
trust_remote_code=trust_remote_code,
max_num_seqs=max_num_seqs,
gpu_memory_utilization=gpu_memory_utilization,
)
if not args.random_input:
outputs = llm.generate(prompts, sampling_params)
else:
outputs = llm.generate(None, sampling_params, prompts)
# Print the outputs.
for i, output in enumerate(outputs):
if i >= 5:
# print only 5 outputs
break
prompt = output.prompt
generated_text = output.outputs[0].text
print(
f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
f"Generated text: {generated_text!r}"
)

# Give engines time to pause their processing loops before exiting.
sleep(1)


if __name__ == "__main__":
args = parse_args()

dp_size = args.dp_size
tp_size = args.tp_size
node_size = args.node_size
node_rank = args.node_rank

if node_size == 1:
dp_master_ip = "127.0.0.1"
dp_master_port = get_open_port()
else:
dp_master_ip = args.master_addr
dp_master_port = args.master_port

assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
dp_per_node = dp_size // node_size

from multiprocessing import Process

procs = []
for local_dp_rank, global_dp_rank in enumerate(
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)
):
proc = Process(
target=main,
args=(
args.model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
tp_size,
args.enforce_eager,
args.trust_remote_code,
args.max_num_seqs,
args.gpu_memory_utilization,
),
)
proc.start()
procs.append(proc)
exit_code = 0
for proc in procs:
proc.join(timeout=300)
if proc.exitcode is None:
print(
f"Killing process {proc.pid} that didn't stop within 5 minutes."
)
proc.kill()
exit_code = 1
elif proc.exitcode:
exit_code = proc.exitcode

exit(exit_code)
10 changes: 10 additions & 0 deletions tests/full_tests/ci_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ if [ $? -ne 0 ]; then
exit -1
fi
echo "Test with structured outputs passed"

# DP2
echo "Testing data parallel size 2 with vllm-hpu plugin v1"
echo HABANA_VISIBLE_DEVICES=all VLLM_SKIP_WARMUP=true PT_HPU_LAZY_MODE=1 VLLM_USE_V1=1 python -u vllm-gaudi/examples/data_parallel.py --dp-size 2 --tp-size 2
HABANA_VISIBLE_DEVICES=all VLLM_SKIP_WARMUP=true PT_HPU_LAZY_MODE=1 VLLM_USE_V1=1 python -u vllm-gaudi/examples/data_parallel.py --dp-size 2 --tp-size 2
if [ $? -ne 0 ]; then
echo "Error: Test failed for data parallel size 2" >&2
exit -1
fi
echo "Test with data parallel size 2 passed"
73 changes: 73 additions & 0 deletions vllm_gaudi/distributed/device_communicators/hpu_communicator.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
# SPDX-License-Identifier: Apache-2.0

from typing import Optional
import torch
import torch.distributed as dist
from torch.distributed import ProcessGroup

from vllm.distributed.device_communicators.base_device_communicator \
import DeviceCommunicatorBase
from vllm.forward_context import get_forward_context
from vllm.distributed.parallel_state import GroupCoordinator, get_dp_group

import habana_frameworks.torch as htorch # noqa: F401


class HpuCommunicator(DeviceCommunicatorBase):

def __init__(self,
cpu_group: ProcessGroup,
device: Optional[torch.device] = None,
device_group: Optional[ProcessGroup] = None,
unique_name: str = ""):
super().__init__(cpu_group, device, device_group, unique_name)

self.dp_group: Optional[GroupCoordinator] = None
self.dp_rank = 0
self.dp_world_size = 1
# assume EP is enabled along with DP
if "ep" in unique_name:
self.dp_group = get_dp_group()
self.dp_rank = self.dp_group.rank_in_group
self.dp_world_size = self.dp_group.world_size

def all_reduce(self, input_: torch.Tensor) -> torch.Tensor:
# FIXME(kzawora): this is a workaround for a bug in Habana PT bridge
# occurring when PT_HPU_ENABLE_LAZY_COLLECTIVES=true env var is used
Expand Down Expand Up @@ -41,3 +61,56 @@ def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
input_size[dim], ) +
input_size[dim + 1:])
return output_tensor

def dispatch(
self, hidden_states: torch.Tensor,
router_logits: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
assert self.dp_group is not None
assert hidden_states.dim() == 2, "Input hidden states must be 2D"
input_size = hidden_states.size()
# Allocate output tensor.
output_size = list(input_size)
output_size[0] *= self.dp_world_size
hidden_states_across_dp = torch.empty(output_size,
dtype=hidden_states.dtype,
device=hidden_states.device)
torch.distributed.all_gather_into_tensor(
hidden_states_across_dp,
hidden_states,
group=self.dp_group.device_group)

router_logits_size = router_logits.size()
router_logits_output_size = list(router_logits_size)
router_logits_output_size[0] *= self.dp_world_size
router_logits_across_dp = torch.empty(router_logits_output_size,
dtype=router_logits.dtype,
device=router_logits.device)
torch.distributed.all_gather_into_tensor(
router_logits_across_dp,
router_logits,
group=self.dp_group.device_group)
return hidden_states_across_dp, router_logits_across_dp

def combine(self, hidden_states: torch.Tensor) -> torch.Tensor:
if htorch.utils.internal.is_lazy():
htorch.core.mark_step()
assert self.dp_group is not None
assert hidden_states.dim() == 2, "Input hidden states must be 2D"
cu_tokens_across_dp_cpu = get_forward_context(
).dp_metadata.cu_tokens_across_dp_cpu

# assume num tokens is padded across DP ranks
assert cu_tokens_across_dp_cpu[
0] * self.dp_world_size == cu_tokens_across_dp_cpu[-1]

local_hidden_states = torch.empty(
(cu_tokens_across_dp_cpu[0], hidden_states.size(-1)),
device=hidden_states.device,
dtype=hidden_states.dtype)

torch.distributed.reduce_scatter_tensor(
local_hidden_states,
hidden_states,
group=self.dp_group.device_group)
hidden_states = local_hidden_states
return hidden_states
1 change: 1 addition & 0 deletions vllm_gaudi/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class HpuPlatform(Platform):
supported_quantization: list[str] = [
"compressed-tensors", "fp8", "inc", "awq_hpu", "gptq_hpu"
]
simple_compile_backend = "hpu_backend"

@classmethod
def get_attn_backend_cls(cls, selected_backend: _Backend, head_size: int,
Expand Down
Loading
Loading