diff --git a/model-engine/model_engine_server/api/v2/chat_completion.py b/model-engine/model_engine_server/api/v2/chat_completion.py index 614f159d0..6140c4517 100644 --- a/model-engine/model_engine_server/api/v2/chat_completion.py +++ b/model-engine/model_engine_server/api/v2/chat_completion.py @@ -263,7 +263,7 @@ async def chat_completion( ) else: logger.info( - f"POST /v2/chat/completion ({('stream' if request.stream else 'sync')}) with {request} to endpoint {model_endpoint_name} for {auth}" + f"POST /v2/chat/completion ({('stream' if request.stream else 'sync')}) with to endpoint {model_endpoint_name} for {auth}" ) if request.stream: diff --git a/model-engine/model_engine_server/api/v2/completion.py b/model-engine/model_engine_server/api/v2/completion.py index ed529fe3b..aaf599188 100644 --- a/model-engine/model_engine_server/api/v2/completion.py +++ b/model-engine/model_engine_server/api/v2/completion.py @@ -262,7 +262,7 @@ async def completion( ) else: logger.info( - f"POST /v2/completion ({('stream' if request.stream else 'sync')}) with {request} to endpoint {model_endpoint_name} for {auth}" + f"POST /v2/completion ({('stream' if request.stream else 'sync')}) with to endpoint {model_endpoint_name} for {auth}" ) if request.stream: diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index 1fb8dbed9..4d3e312aa 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -377,6 +377,88 @@ def check_docker_image_exists_for_image_tag( tag=framework_image_tag, ) + async def create_sglang_multinode_bundle( + self, + user: User, + model_name: str, + framework_image_tag: str, + endpoint_unique_name: str, + num_shards: int, + nodes_per_worker: int, + quantize: Optional[Quantization], + checkpoint_path: Optional[str], + chat_template_override: Optional[str], + additional_args: Optional[SGLangEndpointAdditionalArgs] = None, + ): + leader_command = [ + "python3", + "/root/sglang-startup-script.py", + "--model", + "deepseek-ai/DeepSeek-R1", + "--nnodes", + "2", + "--node-rank", + "0" + "--worker-port", + "5005", + "--leader-port", + "5002" + ] + + worker_command = [ + "python3", + "/root/sglang-startup-script.py", + "--model", + "deepseek-ai/DeepSeek-R1", + "--nnodes", + "2", + "--node-rank", + "1", + "--worker-port", + "5005", + "--leader-port", + "5002" + ] + + # NOTE: the most important env var SGLANG_HOST_IP is already established in the sglang startup script + + common_sglang_envs = { # these are for debugging + "NCCL_SOCKET_IFNAME": "eth0", + "GLOO_SOCKET_IFNAME": "eth0", + } + + # This is same as VLLM multinode bundle + create_model_bundle_v2_request = CreateModelBundleV2Request( + name=endpoint_unique_name, + schema_location="TBA", + flavor=StreamingEnhancedRunnableImageFlavor( + flavor=ModelBundleFlavorType.STREAMING_ENHANCED_RUNNABLE_IMAGE, + repository=hmi_config.sglang_repository, + tag=framework_image_tag, + command=leader_command, + streaming_command=leader_command, + protocol="http", + readiness_initial_delay_seconds=10, + healthcheck_route="/health", + predict_route="/predict", + streaming_predict_route="/stream", + extra_routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], + env=common_sglang_envs, + worker_command=worker_command, + worker_env=common_sglang_envs, + ), + metadata={}, + ) + + return ( + await self.create_model_bundle_use_case.execute( + user, + create_model_bundle_v2_request, + do_auth_check=False, + ) + ).model_bundle_id + + async def execute( self, user: User, @@ -400,7 +482,7 @@ async def execute( self.check_docker_image_exists_for_image_tag( framework_image_tag, INFERENCE_FRAMEWORK_REPOSITORY[framework] ) - if multinode and framework != LLMInferenceFramework.VLLM: + if multinode and framework not in [LLMInferenceFramework.VLLM, LLMInferenceFramework.SGLANG]: raise ObjectHasInvalidValueException( f"Multinode is not supported for framework {framework}." ) @@ -481,16 +563,30 @@ async def execute( if additional_args else None ) - bundle_id = await self.create_sglang_bundle( - user, - model_name, - framework_image_tag, - endpoint_name, - num_shards, - checkpoint_path, - chat_template_override, - additional_args=additional_sglang_args, - ) + if multinode: + bundle_id = await self.create_sglang_multinode_bundle( + user, + model_name, + framework_image_tag, + endpoint_name, + num_shards, + nodes_per_worker, + quantize, + checkpoint_path, + chat_template_override, + additional_args=additional_sglang_args, + ) + else: + bundle_id = await self.create_sglang_bundle( + user, + model_name, + framework_image_tag, + endpoint_name, + num_shards, + checkpoint_path, + chat_template_override, + additional_args=additional_sglang_args, + ) case _: assert_never(framework) raise ObjectHasInvalidValueException( @@ -1323,7 +1419,7 @@ async def execute( if ( request.nodes_per_worker > 1 - and not request.inference_framework == LLMInferenceFramework.VLLM + and not request.inference_framework in [LLMInferenceFramework.VLLM, LLMInferenceFramework.SGLANG] ): raise ObjectHasInvalidValueException( "Multinode endpoints are only supported for VLLM models." diff --git a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py index 89fcb3fb1..d1732bfdd 100644 --- a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py +++ b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py @@ -90,7 +90,7 @@ async def predict( ) return response except Exception: - logger.error(f"Failed to decode payload from: {request}") + logger.error(f"Failed to decode payload from: ") raise @@ -103,10 +103,10 @@ async def stream( try: payload = request.model_dump() except Exception: - logger.error(f"Failed to decode payload from: {request}") + logger.error(f"Failed to decode payload from: ") raise else: - logger.debug(f"Received request: {payload}") + logger.debug(f"Received request:") responses = forwarder.forward(payload) # We fetch the first response to check if upstream request was successful diff --git a/model-engine/model_engine_server/inference/sglang/README.md b/model-engine/model_engine_server/inference/sglang/README.md index fc45e7943..03ede0cc6 100644 --- a/model-engine/model_engine_server/inference/sglang/README.md +++ b/model-engine/model_engine_server/inference/sglang/README.md @@ -163,6 +163,35 @@ GIT_SHA="$(git rev-parse HEAD)-rcX" SETENV=prod LOCAL=true AWS_PROFILE=ml-servin 4. Send a `POST` request to `http://localhost:5001/v1/llm/model-endpoints` with the following body: ```json +# do i need to set borrowing to false? +{ + "name": "deepseek-r1-0528", + "model_name": "deepseek-r1-0528", + "endpoint_type": "streaming", + "cpus": 160, + "memory": "800Gi", + "min_workers": 1, + "max_workers": 1, + "gpus": 8, + "gpu_type": "nvidia-hopper-h100", + "storage": "900Gi", + "per_worker": 1, + "num_shards": 8, + "nodes_per_worker": 2, + "labels": { + "team": "infra", + "product": "inference.llm_model_zoo" + }, + "inference_framework": "sglang", + "inference_framework_image_tag": "multinode-latest-2", # TODO change this? + "high_priority": true, + "metadata": { + "_llm": + {"source": "hugging_face", "quantize": null, "model_name": "deepseek-r1-0528", "num_shards": 8, "checkpoint_path": "s3://scale-ml/models/hf-synced-weights/deepseek-ai/DeepSeek-R1-0528", "inference_framework": "sglang", "chat_template_override": null, "inference_framework_image_tag": "multinode-latest-2"} # TODO change this? + } +} + + { "name": "deepseek-r1", "model_name": "deepseek-r1",