Skip to content

Commit c59ca52

Browse files
committed
integration test
Signed-off-by: nicklucche <[email protected]>
1 parent 9ed29aa commit c59ca52

File tree

2 files changed

+67
-25
lines changed

2 files changed

+67
-25
lines changed

tests/v1/kv_connector/nixl_integration/run_accuracy_test.sh

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,39 @@ get_model_args() {
4444
echo "$extra_args"
4545
}
4646

47+
set_cli_args() {
48+
PREFILLER_TP_SIZE=1
49+
DECODER_TP_SIZE=1
50+
# Iterate through the rest of the arguments
51+
while [[ $# -gt 0 ]]; do
52+
echo $#
53+
case "$1" in
54+
--prefiller-tp-size)
55+
if [[ -n "$2" ]]; then
56+
PREFILLER_TP_SIZE="$2"
57+
shift 2 # Consume the flag and its value ($2)
58+
else
59+
echo "Error: --prefiller-tp-size requires a value." >&2
60+
exit 1
61+
fi
62+
;;
63+
--decoder-tp-size)
64+
if [[ -n "$2" ]]; then
65+
DECODER_TP_SIZE="$2"
66+
shift 2
67+
else
68+
echo "Error: --decoder-tp-size requires a value." >&2
69+
exit 1
70+
fi
71+
;;
72+
*)
73+
# Handle any arguments not recognized
74+
shift # Ignore unknown argument
75+
;;
76+
esac
77+
done
78+
}
79+
4780

4881
# Function to run tests for a specific model
4982
run_tests_for_model() {
@@ -54,6 +87,7 @@ run_tests_for_model() {
5487

5588
# Get model-specific arguments
5689
local model_args=$(get_model_args "$model_name")
90+
set_cli_args "$@"
5791

5892
# Arrays to store all hosts and ports
5993
PREFILL_HOSTS=()
@@ -65,19 +99,21 @@ run_tests_for_model() {
6599
for i in $(seq 0 $((NUM_PREFILL_INSTANCES-1))); do
66100
# Calculate GPU ID - we'll distribute across available GPUs
67101
GPU_ID=$((i % $(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l)))
102+
68103
# Calculate port number (base port + instance number)
69104
PORT=$((8100 + i))
70-
# Calculate side channel port
71-
SIDE_CHANNEL_PORT=$((5559 + i))
105+
# Calculate side channel port. Avoid clash with with TP workers.
106+
SIDE_CHANNEL_PORT=$((5559 + i * $PREFILLER_TP_SIZE))
72107

73108
echo "Starting prefill instance $i on GPU $GPU_ID, port $PORT"
74109

75110
# Build the command with or without model-specific args
76-
BASE_CMD="CUDA_VISIBLE_DEVICES=$GPU_ID VLLM_NIXL_SIDE_CHANNEL_PORT=$SIDE_CHANNEL_PORT vllm serve $model_name \
111+
BASE_CMD="VLLM_WORKER_MULTIPROC_METHOD=spawn VLLM_ENABLE_V1_MULTIPROCESSING=0 CUDA_VISIBLE_DEVICES=$GPU_ID VLLM_NIXL_SIDE_CHANNEL_PORT=$SIDE_CHANNEL_PORT vllm serve $model_name \
77112
--port $PORT \
78113
--enforce-eager \
79114
--disable-log-requests \
80115
--gpu-memory-utilization 0.2 \
116+
--tensor-parallel-size $PREFILLER_TP_SIZE \
81117
--kv-transfer-config '{\"kv_connector\":\"NixlConnector\",\"kv_role\":\"kv_both\"}'"
82118

83119
if [ -n "$model_args" ]; then
@@ -97,19 +133,21 @@ run_tests_for_model() {
97133
for i in $(seq 0 $((NUM_DECODE_INSTANCES-1))); do
98134
# Calculate GPU ID - we'll distribute across available GPUs, starting from after prefill GPUs
99135
GPU_ID=$(((i + NUM_PREFILL_INSTANCES) % $(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l)))
136+
100137
# Calculate port number (base port + instance number)
101138
PORT=$((8200 + i))
102139
# Calculate side channel port
103-
SIDE_CHANNEL_PORT=$((5659 + i))
140+
SIDE_CHANNEL_PORT=$((5659 + i * $PREFILLER_TP_SIZE))
104141

105142
echo "Starting decode instance $i on GPU $GPU_ID, port $PORT"
106143

107144
# Build the command with or without model-specific args
108-
BASE_CMD="CUDA_VISIBLE_DEVICES=$GPU_ID VLLM_NIXL_SIDE_CHANNEL_PORT=$SIDE_CHANNEL_PORT vllm serve $model_name \
145+
BASE_CMD="VLLM_WORKER_MULTIPROC_METHOD=spawn VLLM_ENABLE_V1_MULTIPROCESSING=0 CUDA_VISIBLE_DEVICES=$GPU_ID VLLM_NIXL_SIDE_CHANNEL_PORT=$SIDE_CHANNEL_PORT vllm serve $model_name \
109146
--port $PORT \
110147
--enforce-eager \
111148
--disable-log-requests \
112149
--gpu-memory-utilization 0.2 \
150+
--tensor-parallel-size $DECODER_TP_SIZE \
113151
--kv-transfer-config '{\"kv_connector\":\"NixlConnector\",\"kv_role\":\"kv_both\"}'"
114152

115153
if [ -n "$model_args" ]; then
@@ -165,7 +203,7 @@ run_tests_for_model() {
165203

166204
# Run tests for each model
167205
for model in "${MODELS[@]}"; do
168-
run_tests_for_model "$model"
206+
run_tests_for_model "$model" "$@"
169207
done
170208

171209
echo "All tests completed!"

tests/v1/kv_connector/nixl_integration/toy_proxy_server.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,8 @@ async def send_request_to_service(client_info: dict, endpoint: str,
151151
Send a request to a service using a client from the pool.
152152
"""
153153
req_data = req_data.copy()
154-
req_data['kv_transfer_params'] = {
155-
"do_remote_decode": True,
156-
"do_remote_prefill": False,
157-
"remote_engine_id": None,
158-
"remote_block_ids": None,
159-
"remote_host": None,
160-
"remote_port": None
161-
}
154+
req_data['do_remote_decode'] = True
162155
req_data["stream"] = False
163-
req_data["max_tokens"] = 1
164-
if "stream_options" in req_data:
165-
del req_data["stream_options"]
166156
headers = {
167157
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
168158
"X-Request-Id": request_id
@@ -177,14 +167,22 @@ async def send_request_to_service(client_info: dict, endpoint: str,
177167

178168

179169
async def stream_service_response(client_info: dict, endpoint: str,
180-
req_data: dict, request_id: str):
170+
req_data: dict, remote_block_ids: list[int],
171+
remote_engine_id: str, remote_host: str,
172+
remote_port: int, request_id: str):
181173
"""
182174
Asynchronously stream response from a service using a client from the pool.
183175
"""
184176
headers = {
185177
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
186178
"X-Request-Id": request_id
187179
}
180+
req_data = req_data.copy()
181+
req_data['do_remote_prefill'] = True
182+
req_data["remote_block_ids"] = remote_block_ids
183+
req_data['remote_engine_id'] = remote_engine_id
184+
req_data["remote_host"] = remote_host
185+
req_data["remote_port"] = remote_port
188186

189187
async with client_info['client'].stream("POST",
190188
endpoint,
@@ -211,9 +209,10 @@ async def handle_completions(request: Request):
211209

212210
# Extract the needed fields
213211
response_json = response.json()
214-
kv_transfer_params = response_json.get('kv_transfer_params', {})
215-
if kv_transfer_params:
216-
req_data["kv_transfer_params"] = kv_transfer_params
212+
remote_block_ids = response_json.get('remote_block_ids', [])
213+
remote_engine_id = response_json.get('remote_engine_id', '')
214+
remote_host = response_json.get('remote_host', '')
215+
remote_port = response_json.get('remote_port', 0)
217216

218217
# Get the next decode client in round-robin fashion
219218
decode_client_info = get_next_client(request.app, 'decode')
@@ -222,10 +221,15 @@ async def handle_completions(request: Request):
222221

223222
# Stream response from decode service
224223
async def generate_stream():
225-
async for chunk in stream_service_response(decode_client_info,
226-
"/completions",
227-
req_data,
228-
request_id=request_id):
224+
async for chunk in stream_service_response(
225+
decode_client_info,
226+
"/completions",
227+
req_data,
228+
remote_block_ids=remote_block_ids,
229+
remote_engine_id=remote_engine_id,
230+
remote_host=remote_host,
231+
remote_port=remote_port,
232+
request_id=request_id):
229233
yield chunk
230234

231235
return StreamingResponse(generate_stream(),

0 commit comments

Comments
 (0)