Skip to content

Commit 62485cc

Browse files
authored
Creating standardized apis for inframework and hf deployment (#302)
Signed-off-by: Pranav Prashant Thombre <pthombre@nvidia.com>
1 parent 31b71c0 commit 62485cc

16 files changed

+1151
-922
lines changed

nemo_deploy/deploy_ray.py

Lines changed: 355 additions & 23 deletions
Large diffs are not rendered by default.

nemo_deploy/nlp/hf_deployable_ray.py

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import logging
1717
import time
18-
from typing import Any, Dict
18+
from typing import Any, Dict, Optional
1919

2020
import numpy as np
2121
import torch
@@ -62,8 +62,8 @@ def __init__(
6262
task: str = "text-generation",
6363
trust_remote_code: bool = True,
6464
model_id: str = "nemo-model",
65-
device_map: str = "auto",
66-
max_memory: str = None,
65+
device_map: Optional[str] = None,
66+
max_memory: Optional[str] = None,
6767
):
6868
"""Initialize the HuggingFace model deployment.
6969
@@ -81,7 +81,7 @@ def __init__(
8181
"""
8282
try:
8383
max_memory_dict = None
84-
self._setup_unique_distributed_parameters(device_map)
84+
self._setup_unique_distributed_parameters()
8585
if device_map == "balanced":
8686
if not max_memory:
8787
raise ValueError("max_memory must be provided when device_map is 'balanced'")
@@ -102,29 +102,25 @@ def __init__(
102102
LOGGER.error(f"Error initializing HuggingFaceLLMServe replica: {str(e)}")
103103
raise
104104

105-
def _setup_unique_distributed_parameters(self, device_map):
105+
def _setup_unique_distributed_parameters(self):
106106
"""Configure unique distributed communication parameters for each model replica.
107107
108108
This function sets up unique MASTER_PORT environment variables for each Ray Serve
109109
replica to ensure they can initialize their own torch.distributed process groups
110-
without port conflicts. Only runs for 'balanced' or 'auto' device maps.
111-
112-
Args:
113-
device_map (str): The device mapping strategy ('auto', 'balanced', etc.)
110+
without port conflicts.
114111
"""
115-
if device_map == "balanced" or device_map == "auto":
116-
import os
112+
import os
117113

118-
import torch.distributed as dist
114+
import torch.distributed as dist
119115

120-
# Check if torch.distributed is already initialized
121-
if not dist.is_initialized():
122-
# Get a unique port based on current process ID to avoid conflicts
116+
# Check if torch.distributed is already initialized
117+
if not dist.is_initialized():
118+
# Get a unique port based on current process ID to avoid conflicts
123119

124-
unique_port = find_available_port(29500, "127.0.0.1")
125-
# Set environment variables for torch.distributed
126-
os.environ["MASTER_ADDR"] = "127.0.0.1"
127-
os.environ["MASTER_PORT"] = str(unique_port)
120+
unique_port = find_available_port(29500, "127.0.0.1")
121+
# Set environment variables for torch.distributed
122+
os.environ["MASTER_ADDR"] = "127.0.0.1"
123+
os.environ["MASTER_PORT"] = str(unique_port)
128124

129125
@app.post("/v1/completions/")
130126
async def completions(self, request: Dict[Any, Any]):
@@ -267,16 +263,9 @@ async def chat_completions(self, request: Dict[Any, Any]):
267263
prompt = "\n".join([f"{msg.get('role', 'user')}: {msg.get('content', '')}" for msg in messages])
268264
prompt += "\nassistant:"
269265

270-
# Create a modified request with the prompt
271-
chat_request = request.copy()
272-
chat_request["prompt"] = prompt
273-
274-
# Extract parameters from the request dictionary
275-
messages = request.get("messages", [])
276-
277-
# Prepare inference parameters
266+
# Prepare inference parameters using the formatted prompt
278267
inference_inputs = {
279-
"prompts": [messages], # Wrap messages in a list so apply_chat_template gets the full conversation
268+
"prompts": [prompt], # Use formatted prompt string instead of raw messages
280269
"max_length": request.get("max_tokens", 256),
281270
"temperature": request.get("temperature", 1.0),
282271
"top_k": request.get("top_k", 0),
@@ -330,7 +319,7 @@ async def chat_completions(self, request: Dict[Any, Any]):
330319
),
331320
"finish_reason": (
332321
"length"
333-
if generated_texts and len(generated_texts[0]) >= inference_inputs["max_length"]
322+
if generated_texts and len(generated_texts[0]) >= request.get("max_tokens", 256)
334323
else "stop"
335324
),
336325
}

nemo_deploy/nlp/megatronllm_deployable_ray.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import logging
1717
import os
1818
import time
19-
from typing import Any, Dict
19+
from typing import Any, Dict, Optional
2020

2121
import numpy as np
2222
import ray
@@ -53,6 +53,8 @@ def __init__(
5353
enable_cuda_graphs: bool = False,
5454
enable_flash_decode: bool = False,
5555
legacy_ckpt: bool = False,
56+
max_batch_size: int = 32,
57+
random_seed: Optional[int] = None,
5658
):
5759
# Use replica-specific environment variables to avoid conflicts
5860
os.environ["MASTER_PORT"] = master_port
@@ -82,6 +84,8 @@ def __init__(
8284
enable_cuda_graphs=enable_cuda_graphs,
8385
enable_flash_decode=enable_flash_decode,
8486
legacy_ckpt=legacy_ckpt,
87+
max_batch_size=max_batch_size,
88+
random_seed=random_seed,
8589
)
8690
if rank != 0:
8791
self.model.generate_other_ranks()
@@ -111,7 +115,6 @@ def __init__(
111115
self,
112116
nemo_checkpoint_filepath: str,
113117
num_gpus: int = 1,
114-
num_nodes: int = 1,
115118
tensor_model_parallel_size: int = 1,
116119
pipeline_model_parallel_size: int = 1,
117120
context_parallel_size: int = 1,
@@ -120,13 +123,14 @@ def __init__(
120123
enable_cuda_graphs: bool = False,
121124
enable_flash_decode: bool = False,
122125
legacy_ckpt: bool = False,
126+
max_batch_size: int = 32,
127+
random_seed: Optional[int] = None,
123128
):
124129
"""Initialize the distributed Megatron LLM model deployment.
125130
126131
Args:
127132
nemo_checkpoint_filepath (str): Path to the .nemo checkpoint file.
128-
num_gpus (int): Number of GPUs to use per replica.
129-
num_nodes (int): Number of nodes to use for deployment.
133+
num_gpus (int): Number of GPUs to use for the deployment
130134
tensor_model_parallel_size (int): Size of tensor model parallelism.
131135
pipeline_model_parallel_size (int): Size of pipeline model parallelism.
132136
context_parallel_size (int): Size of context parallelism.
@@ -136,16 +140,16 @@ def __init__(
136140
max_batch_size (int): Maximum batch size for request batching.
137141
batch_wait_timeout_s (float): Maximum time to wait for batching requests.
138142
legacy_ckpt (bool): Whether to use legacy checkpoint format. Defaults to False.
143+
random_seed (int): Random seed for model initialization.
139144
"""
140145
try:
141146
self.model_id = model_id
142-
world_size = num_gpus * num_nodes
143147

144148
# Validate parallelism configuration
145149
total_parallel_size = tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size
146-
if total_parallel_size != world_size:
150+
if total_parallel_size != num_gpus:
147151
raise ValueError(
148-
f"Total parallelism size ({total_parallel_size}) must equal total GPUs per replica ({world_size})"
152+
f"Total parallelism size ({total_parallel_size}) must equal total GPUs per replica ({num_gpus})"
149153
)
150154

151155
# Generate a unique replica ID based on the actor handle
@@ -165,7 +169,7 @@ def __init__(
165169
rank_0_worker = ModelWorker.remote(
166170
nemo_checkpoint_filepath=nemo_checkpoint_filepath,
167171
rank=0,
168-
world_size=world_size,
172+
world_size=num_gpus,
169173
tensor_model_parallel_size=tensor_model_parallel_size,
170174
pipeline_model_parallel_size=pipeline_model_parallel_size,
171175
context_parallel_size=context_parallel_size,
@@ -175,6 +179,8 @@ def __init__(
175179
enable_cuda_graphs=enable_cuda_graphs,
176180
enable_flash_decode=enable_flash_decode,
177181
legacy_ckpt=legacy_ckpt,
182+
max_batch_size=max_batch_size,
183+
random_seed=random_seed,
178184
)
179185
worker_futures.append(rank_0_worker)
180186

@@ -184,11 +190,11 @@ def __init__(
184190
time.sleep(1) # Give rank 0 time to start the distributed backend
185191

186192
# Create remaining workers in parallel
187-
for rank in range(1, world_size):
193+
for rank in range(1, num_gpus):
188194
worker = ModelWorker.remote(
189195
nemo_checkpoint_filepath=nemo_checkpoint_filepath,
190196
rank=rank,
191-
world_size=world_size,
197+
world_size=num_gpus,
192198
tensor_model_parallel_size=tensor_model_parallel_size,
193199
pipeline_model_parallel_size=pipeline_model_parallel_size,
194200
context_parallel_size=context_parallel_size,
@@ -197,17 +203,19 @@ def __init__(
197203
replica_id=replica_id,
198204
enable_cuda_graphs=enable_cuda_graphs,
199205
enable_flash_decode=enable_flash_decode,
206+
max_batch_size=max_batch_size,
207+
random_seed=random_seed,
200208
)
201209
worker_futures.append(worker)
202210

203211
# Wait for all workers to be created and store them
204212
self.workers = worker_futures
205-
LOGGER.info(f"Replica {replica_id} - All {world_size} workers created successfully")
213+
LOGGER.info(f"Replica {replica_id} - All {num_gpus} workers created successfully")
206214

207215
# Primary worker for coordinating inference
208216
self.primary_worker = self.workers[0]
209217

210-
LOGGER.info(f"Replica {replica_id} - Initialized {world_size} model workers across {num_nodes} nodes")
218+
LOGGER.info(f"Replica {replica_id} - Initialized {num_gpus} model workers")
211219

212220
except Exception as e:
213221
LOGGER.error(f"Error initializing distributed model deployment: {str(e)}")

scripts/deploy/nlp/deploy_ray_hf.py

Lines changed: 26 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,12 @@
1414

1515
import argparse
1616
import logging
17-
import multiprocessing
18-
import signal
19-
import sys
2017

2118
from nemo_deploy.deploy_ray import DeployRay
22-
from nemo_deploy.nlp.hf_deployable_ray import HFRayDeployable
2319

2420
LOGGER = logging.getLogger("NeMo")
2521

2622

27-
def get_available_cpus():
28-
"""Get the total number of available CPUs in the system."""
29-
return multiprocessing.cpu_count()
30-
31-
3223
def parse_args():
3324
"""Parse command line arguments."""
3425
parser = argparse.ArgumentParser(description="Deploy a HuggingFace model using Ray")
@@ -52,7 +43,7 @@ def parse_args():
5243
parser.add_argument(
5344
"--device_map",
5445
type=str,
55-
default="auto",
46+
default=None,
5647
help="Device mapping strategy for model placement",
5748
)
5849
parser.add_argument(
@@ -77,7 +68,7 @@ def parse_args():
7768
"--port",
7869
type=int,
7970
default=1024,
80-
help="Port number to use for the Ray Serve server",
71+
help="Port number to use for the Ray Serve server. If None, an available port will be found automatically.",
8172
)
8273
parser.add_argument(
8374
"--num_cpus",
@@ -114,83 +105,54 @@ def parse_args():
114105
default=8,
115106
help="Number of CPUs per model replica",
116107
)
108+
parser.add_argument(
109+
"--max_ongoing_requests",
110+
type=int,
111+
default=10,
112+
help="Maximum number of ongoing requests per replica",
113+
)
117114
parser.add_argument(
118115
"--cuda_visible_devices",
119116
type=str,
120-
default="0,1",
117+
default="0",
121118
help="Comma-separated list of CUDA visible devices",
122119
)
123120
return parser.parse_args()
124121

125122

126-
def signal_handler(signum, frame, deployer):
127-
"""Handle interrupt signals."""
128-
LOGGER.info("Received interrupt signal. Shutting down gracefully...")
129-
deployer.stop()
130-
sys.exit(0)
131-
132-
133123
def main():
124+
"""Main function to deploy HuggingFace model using the updated DeployRay API."""
134125
args = parse_args()
135126

136-
# If num_cpus is not specified, use all available CPUs
137-
if args.num_cpus is None:
138-
args.num_cpus = get_available_cpus()
139-
LOGGER.error(f"Using all available CPUs: {args.num_cpus}")
140-
141-
# Initialize Ray deployment
127+
# Initialize Ray deployment with host, port, and runtime environment
142128
ray_deployer = DeployRay(
143129
num_cpus=args.num_cpus,
144130
num_gpus=args.num_gpus,
145131
include_dashboard=args.include_dashboard,
132+
host=args.host,
133+
port=args.port,
146134
runtime_env={
147135
"env_vars": {
148136
"CUDA_VISIBLE_DEVICES": args.cuda_visible_devices,
149137
}
150138
},
151139
)
152140

153-
# Set up signal handlers
154-
signal.signal(signal.SIGINT, lambda signum, frame: signal_handler(signum, frame, ray_deployer))
155-
signal.signal(
156-
signal.SIGTERM,
157-
lambda signum, frame: signal_handler(signum, frame, ray_deployer),
141+
# Deploy the HuggingFace model using the new API
142+
# This method handles the complete deployment lifecycle internally
143+
ray_deployer.deploy_huggingface_model(
144+
hf_model_id_path=args.model_path,
145+
task=args.task,
146+
trust_remote_code=args.trust_remote_code,
147+
device_map=args.device_map,
148+
max_memory=args.max_memory,
149+
model_id=args.model_id,
150+
num_replicas=args.num_replicas,
151+
num_cpus_per_replica=args.num_cpus_per_replica,
152+
num_gpus_per_replica=args.num_gpus_per_replica,
153+
max_ongoing_requests=args.max_ongoing_requests,
158154
)
159155

160-
try:
161-
# Start Ray Serve
162-
ray_deployer.start(host=args.host, port=args.port)
163-
164-
# Create the HuggingFace model deployment
165-
app = HFRayDeployable.options(
166-
num_replicas=args.num_replicas,
167-
ray_actor_options={
168-
"num_gpus": args.num_gpus_per_replica,
169-
"num_cpus": args.num_cpus_per_replica,
170-
},
171-
).bind(
172-
hf_model_id_path=args.model_path,
173-
task=args.task,
174-
trust_remote_code=args.trust_remote_code,
175-
model_id=args.model_id,
176-
device_map=args.device_map,
177-
max_memory=args.max_memory,
178-
)
179-
180-
# Deploy the model
181-
ray_deployer.run(app, args.model_id)
182-
183-
LOGGER.info(f"Model deployed successfully at {args.host}:{args.port}")
184-
LOGGER.info("Press Ctrl+C to stop the deployment")
185-
186-
# Keep the script running
187-
while True:
188-
signal.pause()
189-
except Exception as e:
190-
LOGGER.error(f"Error during deployment: {str(e)}")
191-
ray_deployer.stop()
192-
sys.exit(1)
193-
194156

195157
if __name__ == "__main__":
196158
main()

0 commit comments

Comments
 (0)