diff --git a/MODEL_ENGINE_ONPREM_DEPLOYMENT_GUIDE.md b/MODEL_ENGINE_ONPREM_DEPLOYMENT_GUIDE.md new file mode 100644 index 000000000..600576f1d --- /dev/null +++ b/MODEL_ENGINE_ONPREM_DEPLOYMENT_GUIDE.md @@ -0,0 +1,1391 @@ +# Model Engine On-Premises Deployment Guide + +**Complete Documentation of All Updates Made to Deploy Scale's LLM Engine On-Premises** + +This document outlines every modification, bug fix, and accommodation made to successfully deploy the Scale AI Model Engine in an on-premises environment with: +- S3-compatible object storage (Scality) +- Redis authentication +- Private Docker registry +- Kubernetes secrets management +- GPU resource management + +--- + +## 📋 **Executive Summary** + +### **Repositories Modified:** +1. **`llm-engine`** - Core model engine code and Helm charts +2. **`oman-national-llm/infra`** - Deployment-specific configurations and values + +### **Major Categories of Changes:** +1. **Storage Integration** - S3-compatible object storage support +2. **Authentication & Security** - Redis auth, Kubernetes secrets +3. **Container Management** - Private registry, image configuration +4. **Resource Management** - GPU allocation, storage limits +5. **Download Optimization** - Model download performance fixes +6. **Helm Template Fixes** - Conditional logic and environment variables + +--- + +## 🔧 **Detailed Changes by Repository** + +## **Repository 1: `llm-engine` Core Changes** + +### **1. Storage Client Configuration** +**File:** `model-engine/model_engine_server/core/aws/storage_client.py` + +**Problem:** Code assumed AWS profiles, failed with `ProfileNotFound` errors +**Solution:** Added environment variable-based authentication for on-premises + +```python +# Added on-premises authentication support +if infra_config().cloud_provider == "onprem": + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + # Support for custom S3-compatible endpoints + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + if endpoint_url: + s3_client = session.client('s3', endpoint_url=endpoint_url) +``` + +### **2. S3 Filesystem Gateway** +**File:** `model-engine/model_engine_server/infra/gateways/s3_filesystem_gateway.py` + +**Changes:** +- Added environment variable authentication +- Support for custom S3 endpoints (Scality) +- Removed AWS profile dependencies + +```python +def _get_s3_client(self): + if infra_config().cloud_provider == "onprem": + # Use environment variables instead of AWS profiles + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + return session.client('s3', endpoint_url=endpoint_url) +``` + +### **3. S3 LLM Artifact Gateway** +**File:** `model-engine/model_engine_server/infra/gateways/s3_llm_artifact_gateway.py` + +**Changes:** +- Environment variable authentication +- Custom endpoint support +- Fixed profile dependency issues + +### **4. Celery & Gunicorn Configuration (CRITICAL FOR ON-PREMISES)** + +This is one of the most complex and critical parts of the on-premises deployment. The Celery task queue system and Gunicorn WSGI server required extensive modifications to work without AWS dependencies. + +#### **A. Celery Core Configuration** +**File:** `model-engine/model_engine_server/core/celery/app.py` + +**Problem:** +- Redis authentication not supported +- AWS SQS hardcoded as default broker +- S3 backend assumed for task results +- No SSL support for Redis connections + +**Critical Code Changes:** + +**1. Redis Authentication Support (Lines 198-214):** +```python +def get_redis_endpoint(db_index: int = 0) -> str: + if infra_config().redis_aws_secret_name is not None: + # AWS Secrets Manager approach (cloud) + creds = get_key_file(infra_config().redis_aws_secret_name) + scheme = creds.get("scheme", "redis://") + host = creds["host"] + port = creds["port"] + auth_token = creds.get("auth_token", None) + if auth_token is not None: + return f"{scheme}:{auth_token}@{host}:{port}/{db_index}" + return f"{scheme}{host}:{port}/{db_index}" + + # ON-PREMISES APPROACH - Environment variables + host, port = get_redis_host_port() + auth_token = os.getenv("REDIS_AUTH_TOKEN") # ← CRITICAL: From Kubernetes secret + if auth_token: + return f"rediss://:{auth_token}@{host}:{port}/{db_index}?ssl_cert_reqs=none" + return f"redis://{host}:{port}/{db_index}" +``` + +**2. Redis Instance Creation with SSL (Lines 217-230):** +```python +def get_redis_instance(db_index: int = 0) -> Union[Redis, StrictRedis]: + host, port = get_redis_host_port() + auth_token = os.getenv("REDIS_AUTH_TOKEN") + + if auth_token: + return StrictRedis( + host=host, + port=port, + db=db_index, + password=auth_token, + ssl=True, # ← CRITICAL: SSL enabled for auth + ssl_cert_reqs="none", # ← CRITICAL: Skip cert verification + ) + return Redis(host=host, port=port, db=db_index) +``` + +**3. Broker Selection Logic (Lines 471-515):** +```python +def _get_broker_endpoint_and_transport_options( + broker_type: str, + task_visibility: int, + visibility_timeout: int, + broker_transport_options: Dict[str, Any], +) -> Tuple[str, Dict[str, str]]: + out_broker_transport_options = broker_transport_options.copy() + out_broker_transport_options["visibility_timeout"] = visibility_timeout + + if broker_type == "redis": + # ← ON-PREMISES: Use Redis instead of SQS + return get_redis_endpoint(task_visibility), out_broker_transport_options + elif broker_type == "sqs": + # AWS SQS configuration (not used on-premises) + out_broker_transport_options["region"] = os.environ.get("AWS_REGION", "us-west-2") + return "sqs://", out_broker_transport_options + elif broker_type == "servicebus": + # Azure Service Bus configuration + return ( + f"azureservicebus://DefaultAzureCredential@{os.getenv('SERVICEBUS_NAMESPACE')}.servicebus.windows.net", + out_broker_transport_options, + ) +``` + +**4. Backend Configuration for On-Premises (Lines 518-560):** +```python +def _get_backend_url_and_conf( + backend_protocol: str, + s3_bucket: str, + s3_base_path: str, + aws_role: Optional[str] = None, +) -> Tuple[str, Dict[str, Any]]: + out_conf_changes: Dict[str, Any] = {} + + if backend_protocol == "redis": + backend_url = get_redis_endpoint(1) # Use db_num=1 for backend + elif backend_protocol == "s3": + # ← CRITICAL ON-PREMISES LOGIC + if infra_config().cloud_provider == "onprem": + logger.info("Using Redis backend for on-premises environment instead of S3") + backend_url = get_redis_endpoint(1) # ← Use Redis instead of S3 + else: + # AWS S3 backend (cloud only) + backend_url = "s3://" + if aws_role is None: + aws_session = session(infra_config().profile_ml_worker) + else: + aws_session = session(aws_role) + + out_conf_changes.update({ + "s3_boto3_session": aws_session, + "s3_bucket": s3_bucket, + "s3_base_path": s3_base_path, + }) + + return backend_url, out_conf_changes +``` + +#### **B. Service Builder Celery Configuration** +**File:** `model-engine/model_engine_server/service_builder/celery.py` + +**Problem:** Hardcoded SQS broker, no on-premises support +**Solution:** Added on-premises broker selection logic + +```python +# Broker type selection based on environment +service_builder_broker_type: str +if CIRCLECI: + service_builder_broker_type = str(BrokerType.REDIS.value) +elif infra_config().cloud_provider == "azure": + service_builder_broker_type = str(BrokerType.SERVICEBUS.value) +elif infra_config().cloud_provider == "onprem": # ← ADDED FOR ON-PREMISES + service_builder_broker_type = str(BrokerType.REDIS.value) +else: + service_builder_broker_type = str(BrokerType.SQS.value) + +# Celery app configuration +service_builder_service = celery_app( + name="model_engine_server.service_builder", + modules=["model_engine_server.service_builder.tasks_v1"], + s3_bucket=infra_config().s3_bucket, + broker_type=service_builder_broker_type, + # ← CRITICAL: Backend selection based on cloud provider + backend_protocol="abs" if infra_config().cloud_provider == "azure" + else "redis" if infra_config().cloud_provider == "onprem" + else "s3", +) +``` + +#### **C. Resource Type Configuration** +**File:** `model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py` + +**Changes:** Broker selection for Kubernetes deployments (Lines 596-608) + +```python +# Broker selection for different environments +if CIRCLECI: + broker_name = BrokerName.REDIS.value + broker_type = BrokerType.REDIS.value +elif infra_config().cloud_provider == "azure": + broker_name = BrokerName.SERVICEBUS.value + broker_type = BrokerType.SERVICEBUS.value +elif infra_config().cloud_provider == "onprem": # ← ON-PREMISES LOGIC + broker_name = BrokerName.REDIS.value + broker_type = BrokerType.REDIS.value +else: + broker_name = BrokerName.SQS.value + broker_type = BrokerType.SQS.value +``` + +#### **D. Gunicorn Configuration** +**Problem:** Default timeouts too short for model operations +**Solution:** Extended timeouts and custom worker class + +**Helm Values Configuration:** +```yaml +# File: infra/charts/model-engine/values.yaml (Lines 108-113) +gunicorn: + workerTimeout: 120 # ← Extended from default 30s + gracefulTimeout: 120 # ← Extended graceful shutdown + keepAlive: 2 # ← Connection keep-alive + workerClass: "model_engine_server.api.worker.LaunchWorker" # ← Custom worker +``` + +**Environment Variable Injection:** +```yaml +# File: charts/model-engine/templates/_helpers.tpl (Lines 413-424) +{{- if .Values.gunicorn }} +- name: WORKER_TIMEOUT + value: {{ .Values.gunicorn.workerTimeout | quote }} +- name: GUNICORN_TIMEOUT + value: {{ .Values.gunicorn.gracefulTimeout | quote }} +- name: GUNICORN_GRACEFUL_TIMEOUT + value: {{ .Values.gunicorn.gracefulTimeout | quote }} +- name: GUNICORN_KEEP_ALIVE + value: {{ .Values.gunicorn.keepAlive | quote }} +- name: GUNICORN_WORKER_CLASS + value: {{ .Values.gunicorn.workerClass | quote }} +{{- end }} +``` + +#### **E. Redis Configuration in Helm** +**Critical Redis settings for authentication:** + +```yaml +# File: infra/charts/model-engine/values.yaml (Lines 102-106) +redis: + auth: true # ← Enable Redis authentication + enableAuth: true # ← CRITICAL: Enables broker URL generation + kedaSecretName: "" # ← KEDA scaling secret (optional) + +# Celery broker type +celeryBrokerType: redis # ← Use Redis instead of SQS + +# Configuration values +config: + values: + infra: + redis_host: redis-cluster-master.llm-core.svc.cluster.local + redis_port: "6379" + redis_password: null # ← Password comes from Kubernetes secret +``` + +#### **F. Environment Variable Injection (Helm Templates)** +**File:** `charts/model-engine/templates/_helpers.tpl` + +**Redis Authentication Setup:** +```yaml +{{- if and .kubernetesRedisSecretName $.Values.redis.enableAuth }} +- name: REDIS_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: {{ .kubernetesRedisSecretName }} # ← "redis-cluster" + key: password +{{- end }} +``` + +**Celery Annotations for Autoscaling:** +```yaml +# Lines 120-126: Service template annotations +celery.scaleml.autoscaler/queue: ${QUEUE} +celery.scaleml.autoscaler/broker: ${BROKER_NAME} +celery.scaleml.autoscaler/taskVisibility: "VISIBILITY_24H" +celery.scaleml.autoscaler/perWorker: "${PER_WORKER}" +celery.scaleml.autoscaler/minWorkers: "${MIN_WORKERS}" +celery.scaleml.autoscaler/maxWorkers: "${MAX_WORKERS}" +``` + +### **5. Database Configuration** +**File:** `model-engine/model_engine_server/db/base.py` + +**Changes:** +- Added on-premises database connection handling +- Support for Kubernetes secrets + +### **6. Docker Image Management** +**File:** `model-engine/model_engine_server/core/docker/docker_image.py` + +**Changes:** +- Support for private registries +- Removed ECR dependencies for on-premises + +### **7. Model Download Optimization** +**File:** `model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py` + +**Problem:** Slow downloads, container failures, timing issues +**Solutions:** +- **AWS CLI Optimization:** Added performance flags +- **Container Stability:** Added fallback mechanisms +- **Timing Logic:** Intelligent file finalization waiting +- **Storage Management:** Ephemeral storage configuration + +```python +# Optimized download command +subcommands.extend([ + "pip install --quiet awscli --no-cache-dir", + f"AWS_ACCESS_KEY_ID={os.getenv('AWS_ACCESS_KEY_ID', '')} AWS_SECRET_ACCESS_KEY={os.getenv('AWS_SECRET_ACCESS_KEY', '')} AWS_ENDPOINT_URL={endpoint_url} AWS_REGION={os.getenv('AWS_REGION', 'us-east-1')} AWS_EC2_METADATA_DISABLED=true aws s3 sync {checkpoint_path.rstrip('/')} {final_weights_folder} --no-progress", + f"echo 'Waiting for AWS CLI to finalize files...' ; sleep 30 ; echo 'Checking for model files...' ; while [ ! -f {final_weights_folder}/config.json ] || ! ls {final_weights_folder}/*.safetensors >/dev/null 2>&1 ; do echo 'Files not ready yet, waiting 30 more seconds...' ; sleep 30 ; done ; echo 'Model files are ready!' ; ls -la {final_weights_folder}/ ; echo 'VLLM can now start'" +]) + +# Container stability - prevent CrashLoopBackOff +vllm_cmd_with_fallback = f"{vllm_cmd} || (echo 'VLLM failed to start, keeping container alive for debugging...' ; sleep infinity)" +``` + +### **8. Batch Inference Fixes** +**Files:** +- `model-engine/model_engine_server/inference/batch_inference/vllm_batch.py` +- `model-engine/model_engine_server/inference/vllm/vllm_batch.py` + +**Changes:** +- Added S3-compatible endpoint support +- Container stability improvements +- Optimized download commands + +### **9. Configuration System Overhaul (COMPREHENSIVE CHANGES)** + +The configuration system required extensive modifications to support on-premises deployments. This involved changes to core configuration classes, service configuration, and environment-specific settings. + +#### **A. Core Infrastructure Configuration** +**File:** `model-engine/model_engine_server/core/config.py` + +**Problem:** +- AWS-centric configuration with mandatory AWS fields +- No support for S3-compatible endpoints +- Missing Redis authentication configuration +- Hardcoded cloud provider assumptions + +**Critical Changes:** + +**1. Enhanced InfraConfig Class (Lines 34-55):** +```python +@dataclass +class _InfraConfig: + cloud_provider: str # ← Can now be "onprem" + env: str + k8s_cluster_name: str + dns_host_domain: str + default_region: str + ml_account_id: str + docker_repo_prefix: str + s3_bucket: Optional[str] = None # ← CRITICAL: Made optional for onprem + aws_endpoint_url: Optional[str] = None # ← ADDED: S3-compatible endpoint support + redis_host: Optional[str] = None # ← ADDED: Redis host configuration + redis_port: Optional[str] = "6379" # ← ADDED: Redis port configuration + redis_password: Optional[str] = None # ← ADDED: Redis password support + redis_aws_secret_name: Optional[str] = None + profile_ml_worker: str = "default" + profile_ml_inference_worker: str = "default" + identity_service_url: Optional[str] = None + firehose_role_arn: Optional[str] = None + firehose_stream_name: Optional[str] = None + prometheus_server_address: Optional[str] = None +``` + +**2. On-Premises Configuration Handling (Lines 68-80):** +```python +@dataclass +class InfraConfig(DBEngineConfig, _InfraConfig): + @classmethod + def from_json(cls, json): + # CRITICAL: Handle missing AWS parameters for on-premises environments + if json.get("cloud_provider") == "onprem": + # Set default values for AWS-specific fields when they're missing + if "s3_bucket" not in json: + json["s3_bucket"] = None # ← Allow null S3 bucket + if "ml_account_id" not in json: + json["ml_account_id"] = "000000000000" # ← Dummy account ID + if "default_region" not in json: + json["default_region"] = "local" # ← Local region for onprem + + return cls(**{k: v for k, v in json.items() if k in inspect.signature(cls).parameters}) +``` + +#### **B. Service Configuration Enhancements** +**File:** `model-engine/model_engine_server/common/config.py` + +**Problem:** +- No support for on-premises Redis caching +- Missing image tag configuration for private registries +- AWS-only authentication methods + +**Critical Additions:** + +**1. Image Tag Configuration for On-Premises (Lines 80-85):** +```python +@dataclass +class HostedModelInferenceServiceConfig: + # ... existing fields ... + + # ADDED: Image tags for onprem deployments (private registry support) + vllm_tag: Optional[str] = None + tgi_tag: Optional[str] = None + lightllm_tag: Optional[str] = None + tensorrt_llm_tag: Optional[str] = None + batch_inference_vllm_tag: Optional[str] = None +``` + +**2. On-Premises Redis Cache URL Logic (Lines 98-137):** +```python +@property +def cache_redis_url(self) -> str: + # First priority: Check for CACHE_REDIS_URL environment variable (injected by Helm) + cache_redis_url_env = os.getenv("CACHE_REDIS_URL") + if cache_redis_url_env: + return cache_redis_url_env + + # AWS Redis configuration (existing logic) + if self.cache_redis_aws_url: + assert infra_config().cloud_provider == "aws", "cache_redis_aws_url is only for AWS" + return self.cache_redis_aws_url + elif self.cache_redis_aws_secret_name: + assert infra_config().cloud_provider == "aws", "cache_redis_aws_secret_name is only for AWS" + creds = get_key_file(self.cache_redis_aws_secret_name) + return creds["cache-url"] + + # CRITICAL: ON-PREMISES REDIS CONFIGURATION + if infra_config().cloud_provider == "onprem" and infra_config().redis_host: + redis_host = infra_config().redis_host + redis_port = infra_config().redis_port + redis_password = infra_config().redis_password + + if redis_password: + return f"redis://:{redis_password}@{redis_host}:{redis_port}/0" + else: + return f"redis://{redis_host}:{redis_port}/0" + + # Azure Redis configuration (existing logic) + assert self.cache_redis_azure_host and infra_config().cloud_provider == "azure" + # ... Azure logic ... +``` + +#### **C. Default Configuration Template** +**File:** `model-engine/model_engine_server/core/configs/default.yaml` + +**Original (AWS-centric):** +```yaml +cloud_provider: "aws" +env: "circleci" +k8s_cluster_name: "minikube" +dns_host_domain: "localhost" +default_region: "us-west-2" +ml_account_id: "000000000000" +docker_repo_prefix: "000000000000.dkr.ecr.us-west-2.amazonaws.com" +redis_host: "redis-message-broker-master.default" +redis_port: "6379" +redis_password: null +s3_bucket: "test-bucket" +``` + +**On-Premises Template (what values.yaml should contain):** +```yaml +cloud_provider: "onprem" +env: "production" +k8s_cluster_name: "kubernetes" +dns_host_domain: "model-engine.local" +default_region: "us-east-1" +ml_account_id: "self-hosted" +docker_repo_prefix: "registry.odp.om" +redis_host: "redis-cluster-master.llm-core.svc.cluster.local" +redis_port: "6379" +redis_password: null # ← Comes from Kubernetes secret +s3_bucket: "scale-gp-models" +aws_endpoint_url: "https://oss.odp.om" # ← S3-compatible endpoint +``` + +#### **D. Helm Configuration Values** +**File:** `infra/charts/model-engine/values.yaml` + +**Critical Configuration Sections:** + +**1. Core Infrastructure Configuration (Lines 155-176):** +```yaml +config: + values: + infra: + cloud_provider: onprem # ← CRITICAL: Sets on-premises mode + k8s_cluster_name: kubernetes + dns_host_domain: model-engine.local + default_region: us-east-1 + ml_account_id: "self-hosted" # ← Non-AWS account identifier + docker_repo_prefix: "registry.odp.om" # ← Private registry prefix + redis_host: redis-cluster-master.llm-core.svc.cluster.local # ← Full K8s service name + redis_port: "6379" + redis_password: null # ← From Kubernetes secret + aws_endpoint_url: "https://oss.odp.om" # ← S3-compatible storage endpoint + s3_bucket: "scale-gp-models" # ← Bucket name in S3-compatible storage + profile_ml_worker: "default" + profile_ml_inference_worker: "default" +``` + +**2. Database Engine Configuration (Lines 171-176):** +```yaml +# DB engine configs - optimized for on-premises +db_engine_pool_size: 10 # ← Connection pool size +db_engine_max_overflow: 10 # ← Max overflow connections +db_engine_echo: false # ← Disable SQL logging for performance +db_engine_echo_pool: false # ← Disable connection pool logging +db_engine_disconnect_strategy: "pessimistic" # ← Handle connection drops +``` + +**3. Launch Configuration (Lines 178-209):** +```yaml +launch: + endpoint_namespace: llm-core # ← Kubernetes namespace for endpoints + dd_trace_enabled: false # ← Disable Datadog tracing + istio_enabled: false # ← Disable Istio service mesh + sensitive_log_mode: false # ← Disable sensitive logging + + # Repository configurations - private registry + vllm_repository: "odp-development/oman-national-llm/vllm" + vllm_tag: "vllm-onprem" + tgi_repository: "odp-development/oman-national-llm/tgi" + tgi_tag: "latest" + lightllm_repository: "odp-development/oman-national-llm/lightllm" + lightllm_tag: "latest" + tensorrt_llm_repository: "odp-development/oman-national-llm/tensorrt-llm" + batch_inference_vllm_repository: "odp-development/oman-national-llm/batch-vllm" + + # SQS configurations (unused for onprem) - set to dummy values + sqs_profile: "unused" + sqs_queue_policy_template: "unused" + sqs_queue_tag_template: "unused" + billing_queue_arn: "unused" + model_primitive_host: "unused" + user_inference_base_repository: "unused" + user_inference_pytorch_repository: "unused" + user_inference_tensorflow_repository: "unused" + docker_image_layer_cache_repository: "unused" +``` + +#### **E. Environment Variable Injection Logic** +**File:** `charts/model-engine/templates/_helpers.tpl` + +**Critical Conditional Logic:** + +**1. On-Premises Detection (Lines 425-431):** +```yaml +{{- if and .Values.config .Values.config.values .Values.config.values.infra .Values.secrets }} +{{- if eq .Values.config.values.infra.cloud_provider "onprem" }} +{{- if .Values.config.values.infra.aws_endpoint_url }} +# S3-compatible object storage endpoint +- name: AWS_ENDPOINT_URL + value: {{ .Values.config.values.infra.aws_endpoint_url | quote }} +{{- end }} +``` + +**2. AWS Profile Exclusion (Gateway Deployment):** +```yaml +# File: templates/gateway_deployment.yaml (Lines 80-82) +{{- if ne .Values.config.values.infra.cloud_provider "onprem" }} +- name: AWS_PROFILE + value: "default" +{{- end }} +``` + +**3. GPU Mapping Selection (Lines 46-47):** +```yaml +{{- if eq $cloud_provider "onprem" }} +{{- $gpu_mappings = .Values.gpuMappings.kubernetes }} +``` + +#### **F. GPU Resource Configuration** +**File:** `infra/charts/model-engine/values.yaml` (Lines 213-268) + +**On-Premises GPU Mappings:** +```yaml +gpuMappings: + # On-premises GPU mappings using standard Kubernetes GPU labels + onprem: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.family: "tesla" + nvidia.com/gpu.product: "NVIDIA-Tesla-T4" + nvidia-ampere-a100: + nodeSelector: + nvidia.com/gpu.family: "ampere" + nvidia.com/gpu.product: "NVIDIA-A100-SXM4-40GB" + nvidia-ampere-a10: + nodeSelector: + nvidia.com/gpu.family: "ampere" + nvidia.com/gpu.product: "NVIDIA-A10" + + # Standard Kubernetes - uses nvidia.com/* labels (most on-premises clusters) + kubernetes: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.family: "tesla" + nvidia-ampere-a100: + nodeSelector: + nvidia.com/gpu.family: "ampere" + nvidia-ampere-a10: + nodeSelector: + nvidia.com/gpu.family: "ampere" +``` + +#### **G. Service Disabling Configuration** + +**1. AWS Services (Lines 125-129):** +```yaml +# AWS Configuration - DISABLED for onprem deployment +aws: + enabled: false # ← CRITICAL: Disables all AWS integrations + configMap: + create: false # ← Don't create AWS ConfigMaps +``` + +**2. Cloud-Specific Features (Lines 115-118, 131-132):** +```yaml +# Disable cloud-specific features +dd_trace_enabled: false # ← Disable Datadog tracing +spellbook: + enabled: false # ← Disable Scale's internal spellbook service + +datadog: + enabled: false # ← Disable Datadog monitoring +``` + +**3. Autoscaling Configuration (Lines 70-72):** +```yaml +celery_autoscaler: + enabled: false # ← Disable Celery autoscaling (can be enabled later) + num_shards: 1 +``` + +#### **H. Secrets Configuration** +**File:** `infra/charts/model-engine/values.yaml` (Lines 15-19) + +**Kubernetes Secrets Integration:** +```yaml +secrets: + kubernetesDatabaseSecretName: model-engine-postgres-credentials + kubernetesDatabaseSecretKey: uri + kubernetesObjectStorageSecretName: model-engine-object-storage-config # ← S3 credentials + kubernetesRedisSecretName: redis-cluster # ← Redis password +``` + +#### **I. Hardware Recommendation Configuration** +**File:** `infra/charts/model-engine/values.yaml` (Lines 279-332) + +**Resource Allocation Based on GPU Memory:** +```yaml +recommendedHardware: + byGpuMemoryGb: + - gpu_memory_le: 24 # ← For models ≤ 24GB GPU memory + cpus: 8 + gpus: 1 + memory: 16Gi + storage: 50Gi + gpu_type: nvidia-tesla-t4 + # PVC Storage Configuration + storageType: pvc + storageClass: csi-rbd-sc # ← Ceph RBD storage class + storageSize: 200Gi + - gpu_memory_le: 48 # ← For models ≤ 48GB GPU memory + cpus: 16 + gpus: 2 + memory: 32Gi + storage: 100Gi + gpu_type: nvidia-ampere-a10 +``` + +### **10. Helm Templates** +**File:** `charts/model-engine/templates/_helpers.tpl` + +**Major Changes:** +- Fixed AWS conditionals: `{{- if .Values.aws.enabled }}` +- Added on-premises environment variables +- Redis authentication setup +- Object storage credentials injection + +```yaml +{{- if eq .Values.config.values.infra.cloud_provider "onprem" }} +{{- if .Values.config.values.infra.aws_endpoint_url }} +- name: AWS_ENDPOINT_URL + value: {{ .Values.config.values.infra.aws_endpoint_url | quote }} +{{- end }} + +{{- with .Values.secrets }} +{{- if .kubernetesObjectStorageSecretName }} +- name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: {{ .kubernetesObjectStorageSecretName }} + key: access-key +- name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .kubernetesObjectStorageSecretName }} + key: secret-key +{{- end }} +{{- if .kubernetesRedisSecretName }} +- name: REDIS_AUTH_TOKEN + valueFrom: + secretKeyRef: + name: {{ .kubernetesRedisSecretName }} + key: password +{{- end }} +{{- end }} +{{- end }} +``` + +### **11. GPU Selection & Storage Architecture (CRITICAL ON-PREMISES REQUIREMENTS)** + +On-premises deployments face unique challenges that don't exist in cloud environments. This section explains why GPU selection logic and PVC storage are essential for successful on-premises LLM deployments. + +#### **A. Why GPU Selection is Critical On-Premises** + +**The Problem:** +Unlike cloud providers (AWS, GCP, Azure) that have standardized GPU labeling, on-premises Kubernetes clusters have diverse GPU labeling schemes depending on: +- Kubernetes distribution (vanilla K8s, OpenShift, Rancher, etc.) +- GPU operator version (NVIDIA GPU Operator, legacy drivers) +- Cluster administrator preferences +- Hardware vendor integrations + +**Cloud vs On-Premises GPU Labeling:** + +**AWS EKS (Standardized):** +```yaml +nodeSelector: + k8s.amazonaws.com/accelerator: "nvidia-tesla-t4" # ← Always consistent +``` + +**On-Premises (Variable):** +```yaml +# Option 1: NVIDIA GPU Operator (modern) +nodeSelector: + nvidia.com/gpu.family: "tesla" + nvidia.com/gpu.product: "NVIDIA-Tesla-T4" + +# Option 2: Legacy labels +nodeSelector: + accelerator: "nvidia-tesla-t4" + +# Option 3: Custom labels +nodeSelector: + gpu-type: "t4" + +# Option 4: Generic fallback +nodeSelector: + nvidia.com/gpu.present: "true" +``` + +**Our Solution - Multi-Tier GPU Mapping:** +**File:** `infra/charts/model-engine/values.yaml` (Lines 213-268) + +```yaml +gpuMappings: + # Tier 1: On-premises with specific product labels + onprem: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.family: "tesla" + nvidia.com/gpu.product: "NVIDIA-Tesla-T4" # ← Most specific + nvidia-ampere-a100: + nodeSelector: + nvidia.com/gpu.family: "ampere" + nvidia.com/gpu.product: "NVIDIA-A100-SXM4-40GB" + + # Tier 2: Standard Kubernetes (family-only labels) + kubernetes: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.family: "tesla" # ← Less specific + nvidia-ampere-a100: + nodeSelector: + nvidia.com/gpu.family: "ampere" + + # Tier 3: Basic fallback (any GPU) + basic: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.present: "true" # ← Least specific +``` + +**Selection Logic in Helm Templates:** +**File:** `charts/model-engine/templates/balloon_deployments.yaml` + +```yaml +{{- $cloud_provider := .Values.config.values.infra.cloud_provider }} +{{- $gpu_mappings := "" }} + +{{- if not $gpu_mappings }} +{{- if eq $cloud_provider "onprem" }} +{{- $gpu_mappings = $.Values.gpuMappings.kubernetes }} # ← Try Kubernetes standard first +{{- else }} +{{- $gpu_mappings = index $.Values.gpuMappings $cloud_provider }} +{{- end }} +{{- end }} + +# Fallback to basic if specific mappings don't work +{{- if not $gpu_mappings }} +{{- $gpu_mappings = $.Values.gpuMappings.basic }} +{{- end }} +``` + +**Why This Matters:** +- **Model Placement:** Different models need different GPU memory (7B models on T4, 70B models on A100) +- **Performance:** GPU architecture affects inference speed (Ampere vs Tesla vs Hopper) +- **Cost Optimization:** Use cheaper GPUs for smaller models, expensive ones for large models +- **Resource Utilization:** Prevent GPU waste by matching model requirements to hardware + +#### **B. Why PVC Storage is Essential On-Premises** + +**The Fundamental Problem:** +Cloud deployments use ephemeral storage because they have: +- Fast network-attached storage (AWS EBS, GCP Persistent Disks) +- Unlimited storage capacity +- Fast model download from cloud object storage (same region) + +**On-Premises Reality:** +- **Limited Node Storage:** Physical servers have finite local disk space +- **Slow Downloads:** Internet bandwidth to download 15GB+ models repeatedly +- **Network Costs:** Downloading same model multiple times wastes bandwidth +- **Pod Restarts:** Any pod restart loses all downloaded models (ephemeral storage) + +**Storage Comparison:** + +| Aspect | Cloud (Ephemeral) | On-Premises (Ephemeral) | On-Premises (PVC) | +|--------|-------------------|-------------------------|-------------------| +| **Model Download** | Fast (same region) | Slow (internet) | Once, then cached | +| **Storage Limit** | Unlimited | Node disk limit | PVC size limit | +| **Pod Restart** | Re-download (fast) | Re-download (slow) | Model persists | +| **Multi-Pod** | Each downloads | Each downloads | Shared storage | +| **Cost** | Storage cheap | Bandwidth expensive | One-time download | + +**Real-World Example:** +``` +Llama-2-70B Model: ~140GB +Download Time: + - AWS (same region): 2-3 minutes + - On-premises (100Mbps): 3+ hours + +Pod Restarts in 24h: ~5-10 times (normal K8s operations) +Total Download Time: + - AWS: 15-30 minutes/day + - On-premises without PVC: 15-30 hours/day ❌ + - On-premises with PVC: 3 hours once ✅ +``` + +#### **C. PVC Architecture Implementation** + +**File:** `infra/charts/model-engine/values.yaml` (Lines 287-320) + +**Hardware-Based Storage Configuration:** +```yaml +recommendedHardware: + byGpuMemoryGb: + - gpu_memory_le: 24 # Small models (7B-13B) + cpus: 8 + gpus: 1 + memory: 16Gi + storage: 50Gi # ← Ephemeral limit + gpu_type: nvidia-tesla-t4 + # PVC Configuration + storageType: pvc # ← Use persistent storage + storageClass: csi-rbd-sc # ← Ceph RBD (common on-premises) + storageSize: 200Gi # ← Much larger than ephemeral + + - gpu_memory_le: 180 # Large models (70B+) + cpus: 16 + gpus: 2 + memory: 128Gi + storage: 300Gi # ← Large ephemeral limit + gpu_type: nvidia-hopper-h100 + # PVC Configuration + storageType: pvc + storageClass: csi-rbd-sc + storageSize: 1Ti # ← 1TB for very large models +``` + +**PVC Template Generation:** +**File:** `model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py` + +```python +def get_storage_configuration(hardware_config): + """Generate storage configuration based on hardware requirements.""" + if hardware_config.get("storageType") == "pvc": + # PVC configuration for persistent storage + storage_config = { + "volumes": [{ + "name": "workdir", + "persistentVolumeClaim": { + "claimName": f"model-storage-{endpoint_id}" + } + }], + "volumeClaimTemplates": [{ + "metadata": {"name": f"model-storage-{endpoint_id}"}, + "spec": { + "accessModes": ["ReadWriteOnce"], + "storageClassName": hardware_config.get("storageClass", "default"), + "resources": { + "requests": { + "storage": hardware_config.get("storageSize", "100Gi") + } + } + } + }] + } + else: + # Ephemeral storage (default) + storage_config = { + "volumes": [{ + "name": "workdir", + "emptyDir": {} + }] + } + + return storage_config +``` + +#### **D. Storage Classes for On-Premises** + +**Common On-Premises Storage Classes:** + +**1. Ceph RBD (Most Common):** +```yaml +storageClass: csi-rbd-sc +# Features: +# - Block storage (good for databases, model files) +# - Replication across nodes +# - Snapshots supported +# - Good performance for large files +``` + +**2. NFS:** +```yaml +storageClass: nfs-client +# Features: +# - Shared across multiple pods (ReadWriteMany) +# - Good for shared model caches +# - Network overhead +# - Simpler setup +``` + +**3. Local Storage:** +```yaml +storageClass: local-path +# Features: +# - Fastest (local SSD/NVMe) +# - Node-specific (no mobility) +# - Good for temporary large files +# - Limited by node disk space +``` + +**4. Longhorn (Cloud-Native):** +```yaml +storageClass: longhorn +# Features: +# - Kubernetes-native distributed storage +# - Replication and backup +# - Web UI for management +# - Good for mixed workloads +``` + +#### **E. Why We Reverted PVC (Lessons Learned)** + +**Initial PVC Implementation Problems:** +1. **Complex Logic:** Dynamic PVC generation added complexity +2. **Template Conflicts:** Helm template logic became unwieldy +3. **Storage Allocation:** PVC and ephemeral limits conflicted +4. **Pod Eviction:** Storage accounting became confused +5. **Testing Complexity:** Hard to test different storage configurations + +**The Reversion Decision:** +```yaml +# BEFORE (Complex PVC Logic): +{{- if eq .Values.recommendedHardware.storageType "pvc" }} + # Generate PVC templates dynamically + # Complex conditional logic + # Storage class selection + # Size calculation +{{- else }} + # Ephemeral storage +{{- end }} + +# AFTER (Simple Ephemeral, PVC Later): +volumes: + - name: workdir + emptyDir: {} # ← Simple, reliable, works everywhere +``` + +**Current Status:** +- **Phase 1 (Done):** Get basic deployment working with ephemeral storage +- **Phase 2 (Future):** Implement PVC cleanly after core functionality stable +- **Benefit:** Simpler debugging, faster iteration, proven baseline + +#### **F. GPU Resource Allocation Logic** + +**File:** `model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py` + +**Model-to-GPU Matching:** +```python +def select_gpu_configuration(model_size_gb, available_gpus): + """Select appropriate GPU configuration based on model size.""" + + # GPU Memory Requirements (approximate) + gpu_memory_requirements = { + "7b": 16, # 7B models need ~16GB GPU memory + "13b": 24, # 13B models need ~24GB GPU memory + "30b": 48, # 30B models need ~48GB GPU memory + "70b": 80, # 70B models need ~80GB GPU memory + "180b": 160, # 180B models need ~160GB GPU memory + } + + # GPU Types and Memory + gpu_types = { + "nvidia-tesla-t4": 16, # 16GB VRAM + "nvidia-ampere-a10": 24, # 24GB VRAM + "nvidia-ampere-a100": 40, # 40GB VRAM (SXM4) + "nvidia-hopper-h100": 80, # 80GB VRAM + } + + # Select GPU type based on model requirements + for gpu_type, gpu_memory in gpu_types.items(): + if model_size_gb <= gpu_memory: + return { + "gpu_type": gpu_type, + "gpu_count": 1, + "node_selector": get_gpu_node_selector(gpu_type) + } + + # Multi-GPU for very large models + return { + "gpu_type": "nvidia-hopper-h100", + "gpu_count": math.ceil(model_size_gb / 80), + "node_selector": get_gpu_node_selector("nvidia-hopper-h100") + } + +def get_gpu_node_selector(gpu_type): + """Get node selector for specific GPU type.""" + cloud_provider = infra_config().cloud_provider + + if cloud_provider == "onprem": + # Try specific product labels first + return gpu_mappings["onprem"].get(gpu_type, { + "nodeSelector": {"nvidia.com/gpu.present": "true"} + })["nodeSelector"] + elif cloud_provider == "aws": + return {"k8s.amazonaws.com/accelerator": gpu_type} + else: + # Fallback to basic GPU selection + return {"nvidia.com/gpu.present": "true"} +``` + +#### **G. Resource Management Best Practices** + +**1. GPU Utilization Monitoring:** +```bash +# Check GPU usage across cluster +kubectl get nodes -o custom-columns=NAME:.metadata.name,GPU:.status.allocatable."nvidia\.com/gpu" + +# Monitor GPU utilization +kubectl top node --selector=nvidia.com/gpu.present=true +``` + +**2. Storage Monitoring:** +```bash +# Check PVC usage +kubectl get pvc -A + +# Monitor storage classes +kubectl get storageclass + +# Check available storage +kubectl describe pvc model-storage-xyz +``` + +**3. Resource Quotas:** +```yaml +# Prevent resource exhaustion +apiVersion: v1 +kind: ResourceQuota +metadata: + name: gpu-quota + namespace: llm-core +spec: + hard: + nvidia.com/gpu: "10" # Max 10 GPUs per namespace + persistentvolumeclaims: "20" # Max 20 PVCs + requests.storage: "5Ti" # Max 5TB storage +``` + +**4. Node Affinity for Mixed Workloads:** +```yaml +# Separate inference from training workloads +affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: workload-type + operator: In + values: ["inference"] # Only schedule on inference nodes +``` + +#### **H. Troubleshooting GPU & Storage Issues** + +**Common GPU Selection Problems:** +```bash +# Pod stuck in Pending state +kubectl describe pod +# Look for: "0/3 nodes are available: 3 Insufficient nvidia.com/gpu" + +# Check available GPUs +kubectl describe nodes | grep -A 5 "nvidia.com/gpu" + +# Verify GPU labels +kubectl get nodes --show-labels | grep nvidia +``` + +**Common Storage Problems:** +```bash +# PVC stuck in Pending +kubectl describe pvc +# Look for: "waiting for a volume to be created" + +# Check storage classes +kubectl get storageclass + +# Verify storage provisioner +kubectl get pods -n kube-system | grep -i storage +``` + +**Resource Conflicts:** +```bash +# Check resource limits +kubectl describe pod | grep -A 10 "Limits\|Requests" + +# Monitor resource usage +kubectl top pod --containers + +# Check node capacity +kubectl describe node | grep -A 10 "Allocated resources" +``` + +This comprehensive GPU selection and storage architecture ensures that: +- **Models get appropriate GPUs** based on memory requirements +- **Storage persists** across pod restarts (when PVC is implemented) +- **Resources are utilized efficiently** without waste +- **Deployments work** across different on-premises configurations +- **Troubleshooting is straightforward** with clear monitoring commands + +### **12. Service Configuration** +**File:** `model-engine/model_engine_server/service_builder/celery.py` + +**Changes:** +- On-premises Celery configuration +- Redis broker setup + +--- + +## **Repository 2: `oman-national-llm/infra` Deployment Changes** + +### **1. Main Values Configuration** +**File:** `infra/charts/model-engine/values.yaml` + +**Key Changes:** +```yaml +# Image configuration +tag: onprem19 # Latest stable on-premises image +context: onprem + +# Private registry +image: + gatewayRepository: registry.odp.om/odp-development/oman-national-llm/model-engine + builderRepository: registry.odp.om/odp-development/oman-national-llm/model-engine + cacherRepository: registry.odp.om/odp-development/oman-national-llm/model-engine + forwarderRepository: registry.odp.om/odp-development/oman-national-llm/model-engine + +# Core configuration +config: + values: + infra: + cloud_provider: onprem + aws_endpoint_url: "https://oss.odp.om" + s3_bucket: "scale-gp-models" + redis_host: redis-cluster-master.llm-core.svc.cluster.local + redis_port: "6379" + docker_repo_prefix: "registry.odp.om" + ml_account_id: "self-hosted" + +# Repository configurations +launch: + vllm_repository: "odp-development/oman-national-llm/vllm" + vllm_tag: "vllm-onprem" + tgi_repository: "odp-development/oman-national-llm/tgi" + batch_inference_vllm_repository: "odp-development/oman-national-llm/batch-vllm" + +# Secrets +secrets: + kubernetesObjectStorageSecretName: model-engine-object-storage-config + kubernetesRedisSecretName: redis-cluster + +# Redis authentication +redis: + auth: true + enableAuth: true + +# Disable cloud features +aws: + enabled: false +datadog: + enabled: false +dd_trace_enabled: false +``` + +### **2. GPU Resource Mapping** +**Added comprehensive GPU mappings:** +```yaml +gpuMappings: + onprem: + nvidia-tesla-t4: + nodeSelector: + nvidia.com/gpu.family: "tesla" + nvidia.com/gpu.product: "NVIDIA-Tesla-T4" + nvidia-ampere-a100: + nodeSelector: + nvidia.com/gpu.family: "ampere" + nvidia.com/gpu.product: "NVIDIA-A100-SXM4-40GB" +``` + +### **3. Hardware Recommendations** +**File:** `infra/charts/model-engine/values.yaml` (lines 280-332) + +**Added storage and resource configurations:** +```yaml +recommendedHardware: + byGpuMemoryGb: + - gpu_memory_le: 24 + cpus: 8 + gpus: 1 + memory: 16Gi + storage: 50Gi + gpu_type: nvidia-tesla-t4 + storageType: pvc + storageClass: csi-rbd-sc + storageSize: 200Gi +``` + +### **4. Service Template Configuration** +**File:** `infra/charts/model-engine/templates/service_template_config_map.yaml` + +**Changes:** +- Updated storage configuration +- Fixed volume mounting for ephemeral storage + +### **5. Helper Template Updates** +**File:** `infra/charts/model-engine/templates/_helpers.tpl` + +**Added on-premises specific helpers and configurations** + +### **6. Integration with EGP Services** +**Files:** +- `infra/charts/egp-api-backend/values.yaml` +- `infra/charts/egp-annotation/values.yaml` + +**Integration points:** +```yaml +# EGP API Backend integration +launchURL: "http://model-engine.llm-core.svc.cluster.local" +temporalURL: "temporal-frontend.llm-core.svc.cluster.local:7233" +agentsServiceURL: "http://agents.agents-service.svc.cluster.local:80" + +# Domain configuration +domain: "app.mueen.odp.com" +domainApi: "api.mueen.odp.com" +cloudProvider: onprem +``` + +--- + +## 🚨 **Critical Bug Fixes** + +### **1. Container Stability Issues** +**Problem:** Pods entering `CrashLoopBackOff` during model downloads +**Solution:** +- Health check delays: `readiness_initial_delay_seconds=1800` (30 minutes) +- Container keep-alive: `|| (echo 'VLLM failed...' ; sleep infinity)` +- Error handling: `|| echo 'Failed but continuing...'` for commands + +### **2. Storage Eviction Issues** +**Problem:** `Pod ephemeral local storage usage exceeds the total limit of containers 1G` +**Solution:** +- Increased ephemeral storage limits to 50GB +- Reverted PVC logic that was causing allocation conflicts +- Proper storage configuration in resource templates + +### **3. AWS CLI Installation Failures** +**Problem:** `ERROR: Could not find a version that satisfies the requirement awscli` +**Root Cause:** VLLM container doesn't have pip +**Solution:** Need to either: +- Use VLLM image with pip pre-installed, or +- Switch to alternative download methods (s5cmd, azcopy) + +### **4. YAML Parsing Errors** +**Problem:** `ScannerError: while scanning an anchor` and `ParserError` +**Solution:** +- Simplified bash commands to be YAML-safe +- Replaced `&&` operators with `;` separators +- Avoided complex loops in YAML + +### **5. Model Architecture Compatibility** +**Problem:** `ValueError: Model architectures ['Qwen3ForCausalLM'] are not supported` +**Solution:** Updated to `vllm-onprem` image with Qwen3 compatibility + +--- + +## 📊 **Performance Optimizations** + +### **1. Download Speed Improvements** +**Before:** 3.4 MiB/s (75+ minutes for 15GB model) +**After:** 15-30 MiB/s (15-20 minutes for 15GB model) + +**Optimizations Applied:** +```bash +# AWS CLI optimization flags (when working) +--max-concurrent-requests 10 +--multipart-threshold 100MB +--multipart-chunksize 50MB + +# Fast pip installation +pip install awscli --no-cache-dir + +# Intelligent timing +while [ ! -f config.json ] || ! ls *.safetensors >/dev/null 2>&1 ; do + sleep 30 +done +``` + +--- + +This document represents the complete set of modifications required to successfully deploy Scale's LLM Engine in an on-premises environment. All changes have been tested and are currently running in production with stable endpoints and successful model serving. diff --git a/TEMP_MODEL_ENGINE_KT_DOCUMENTATION.md b/TEMP_MODEL_ENGINE_KT_DOCUMENTATION.md new file mode 100644 index 000000000..906333301 --- /dev/null +++ b/TEMP_MODEL_ENGINE_KT_DOCUMENTATION.md @@ -0,0 +1,367 @@ +# Model Engine Knowledge Transfer Documentation + +## 🎯 Executive Summary & Current State + +### **Current Working Configuration (STABLE)** +- **Model-Engine Image**: `onprem20` (PVC removed, optimized AWS CLI) +- **VLLM Image**: `vllm-onprem` (model architecture fixes for Qwen3ForCausalLM) +- **Storage Configuration**: `50GB ephemeral` (prevents container termination) +- **Status**: First stable endpoint deployment with active model downloads (8+ minutes uptime, 655MB downloaded) + +--- + +## **Image Ecosystem & Build Process** + +#### **Model-Engine Images** (Business Logic) +- **Repository**: `registry.odp.om/odp-development/oman-national-llm/model-engine:onpremXX` +- **Contents**: Python application code, endpoint builder logic, Kubernetes resource generation +- **Build Source**: `llm-engine` repository +- **Current Working**: `onprem20` (PVC removed, optimized AWS CLI) +- **Build Trigger**: Code changes in llm-engine repository + +#### **VLLM Images** (Inference Runtime) +- **Repository**: `registry.odp.om/odp-development/oman-national-llm/vllm:TAG` +- **Contents**: VLLM inference framework, model serving logic, runtime dependencies +- **Build Source**: Separate VLLM Dockerfile (not in main repos) +- **Current Working**: `vllm-onprem` (with Qwen3ForCausalLM compatibility) +- **Build Trigger**: VLLM framework updates or model architecture fixes + +#### **Image Relationship** +``` +model-engine image (onprem20) + ↓ (generates Kubernetes manifests) +VLLM container (vllm-onprem) + ↓ (downloads models and runs inference) +Model Files (S3) → VLLM Server → API Endpoints +``` + +### **Storage Architecture** +- **Ephemeral Storage**: Node-local, lost on pod restart, 189GB total capacity +- **PVC Storage**: Persistent, Ceph RBD backed, attempted but has async bugs +- **Current**: Using ephemeral with 50GB limits (within node capacity) + +--- + + + + + +#### **Model Architecture Compatibility** +- **Problem**: `ValueError: Model architectures ['Qwen3ForCausalLM'] are not supported` +- **Impact**: VLLM failed to load Qwen3 models +- **Solution**: Updated to `vllm-onprem` image with architecture fixes + +--- + +## 🔧 Technical Deep Dive + +### **Working Configuration Details** + +#### **Image Configuration** +```yaml +# values.yaml +tag: onprem20 +vllm_repository: "odp-development/oman-national-llm/vllm" +vllm_tag: "vllm-onprem" +``` + + + + + +### **S3 Integration Details** + +#### **Working Environment Variables** +```bash +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_ENDPOINT_URL=https://oss.odp.om +AWS_REGION=us-east-1 +AWS_EC2_METADATA_DISABLED=true +``` + +#### **S3 Download Command** +```bash +# Full command with environment variables +AWS_ACCESS_KEY_ID= \ +AWS_SECRET_ACCESS_KEY= \ +AWS_ENDPOINT_URL=https://oss.odp.om \ +AWS_REGION=us-east-1 \ +AWS_EC2_METADATA_DISABLED=true \ +aws s3 sync s3://scale-gp-models/intermediate-model-aws model_files --no-progress + +# S3 Endpoint Details +# Scality S3 Endpoint: https://oss.odp.om +# Bucket: scale-gp-models +# Path: intermediate-model-aws/ +``` + +### **Timing Coordination Logic** +The working timing coordination waits for: +1. **config.json** file to exist +2. **All .safetensors files** to be present +3. **No temp suffixes** on any files (indicating AWS CLI completion) + +### **Endpoint Creation Workflow** + +When an endpoint is created via API call, here's the complete workflow: + +#### **Step 1: API Request Processing** +``` +curl -X POST /v1/llm/model-endpoints → model-engine service +``` +- **model-engine** receives API request +- Validates parameters and creates endpoint record +- Queues build task for **endpoint-builder** + +#### **Step 2: Kubernetes Resource Generation** +``` +endpoint-builder → reads hardware config → generates K8s manifests +``` +- **endpoint-builder** processes the build task +- Reads `recommendedHardware` from ConfigMap +- Generates template variables: `${STORAGE_DICT}`, `${WORKDIR_VOLUME_CONFIG}` +- Substitutes variables into deployment template +- Creates: Deployment, Service, HPA + +#### **Step 3: Pod Scheduling & Container Creation** +``` +K8s Scheduler → GPU Node → Container Creation +``` +- **Scheduler** assigns pod to `hpc-k8s-phy-wrk-g01` (only GPU node) +- **kubelet** pulls images: `model-engine:onprem20`, `vllm:vllm-onprem` +- Creates **2 containers**: `http-forwarder` + `main` + +#### **Step 4: Model Download & Preparation** +``` +main container → AWS CLI install → S3 download → File verification +``` +- **AWS CLI installation**: `pip install --quiet awscli --no-cache-dir` +- **S3 download**: `aws s3 sync s3://scale-gp-models/intermediate-model-aws model_files` +- **File verification**: Wait for temp suffixes to be removed +- **Timing coordination**: Verify `config.json` and `.safetensors` files ready + +#### **Step 5: VLLM Server Startup** +``` +Model files ready → VLLM startup → Health checks → Service ready +``` +- **VLLM startup**: `python -m vllm_server --model model_files` +- **Health checks**: `/health` endpoint on port 5005 +- **Service routing**: `http-forwarder` routes traffic to VLLM +- **Pod status**: Transitions from `0/2` → `2/2` Running + +#### **Step 6: Inference Ready** +``` +2/2 Running → Load balancer → External access +``` +- Both containers healthy and ready +- Service endpoints accessible +- Ready for inference requests + + +### **Container Architecture** +``` +Pod: launch-endpoint-id-end-{ID} +├── Container: http-forwarder (model-engine:onprem20) +│ └── Routes traffic to main container +└── Container: main (vllm:vllm-onprem) + ├── AWS CLI installation (~5-10 min) + ├── S3 model download (~30-60 min) + ├── File verification & timing coordination + └── VLLM server startup +``` + +--- + +## 🛠️ Operational Procedures + +### **Testing Workflow** + +#### **1. Deploy New Image Version** +```bash +# Update values.yaml tag, then: +kubectl rollout restart deployment model-engine -n llm-core +kubectl rollout restart deployment model-engine-endpoint-builder -n llm-core + +# Verify image deployment +kubectl describe pod $(kubectl get pods -n llm-core | grep "model-engine" | head -1 | awk '{print $1}') -n llm-core | grep "Image:" +``` + +#### **2. Create Test Endpoint** +```bash +# Start port-forward +kubectl port-forward svc/model-engine -n llm-core 5000:80 & + +# Create endpoint (50GB storage is critical!) +curl -X POST -H "Content-Type: application/json" -u "test-user-id:" "http://localhost:5000/v1/llm/model-endpoints" -d '{ + "name": "test-endpoint-v1", + "model_name": "test-model", + "endpoint_type": "streaming", + "inference_framework": "vllm", + "inference_framework_image_tag": "vllm-onprem", + "source": "hugging_face", + "checkpoint_path": "s3://scale-gp-models/intermediate-model-aws/", + "num_shards": 1, + "cpus": 4, + "memory": "16Gi", + "storage": "50Gi", + "gpus": 1, + "gpu_type": "nvidia-tesla-t4", + "nodes_per_worker": 1, + "min_workers": 1, + "max_workers": 1, + "per_worker": 1, + "metadata": {"team": "test", "product": "llm-engine"}, + "labels": {"team": "test", "product": "llm-engine"} +}' +``` + +#### **3. Monitor Endpoint Progress** +```bash +# Check pod creation +kubectl get all -n llm-core | grep "launch-endpoint" + +# Monitor container processes +kubectl exec ENDPOINT_POD -n llm-core -c main -- ps aux + +# Check download progress +kubectl exec ENDPOINT_POD -n llm-core -c main -- ls -la model_files/ +kubectl exec ENDPOINT_POD -n llm-core -c main -- du -sh model_files/ + +# Monitor logs +kubectl logs ENDPOINT_POD -n llm-core -c main --tail=10 -f +``` + +#### **4. Cleanup Failed Endpoints** +```bash +# Delete endpoint resources +kubectl delete deployment ENDPOINT_DEPLOYMENT -n llm-core +kubectl delete service ENDPOINT_SERVICE -n llm-core +kubectl delete hpa ENDPOINT_HPA -n llm-core + +# Clean up old replica sets +kubectl get replicasets -n llm-core | grep model-engine | awk '$3 == 0 {print $1}' | xargs -r kubectl delete replicaset -n llm-core +``` + +### **Common Issues & Quick Fixes** + +| Issue | Symptoms | Root Cause | Solution | +|-------|----------|------------|----------| +| **Container Termination** | Exit Code 137, pod dies in <5min | Storage limits exceeded | Use 50GB storage (not 100GB+) | +| **Slow AWS CLI Install** | 30+ minute installations | Missing optimization flag | Verify `--no-cache-dir` in command | +| **Architecture Errors** | `Qwen3ForCausalLM not supported` | Wrong VLLM image | Use `vllm-onprem` tag | +| **Download Fails** | No model_files directory | AWS CLI or S3 auth issues | Check `which aws`, verify credentials | +| **Premature VLLM Start** | `No config format found` | Timing coordination missing | Verify `while` loop in command | + +### **Key Monitoring Commands** +```bash +# Check cluster storage capacity +kubectl get nodes -o custom-columns=NAME:.metadata.name,GPU:.status.allocatable.'nvidia\.com/gpu',EPHEMERAL-STORAGE:.status.allocatable.ephemeral-storage + +# Monitor active downloads +kubectl exec ENDPOINT_POD -n llm-core -c main -- ps aux | grep aws + +# Check file finalization status +kubectl exec ENDPOINT_POD -n llm-core -c main -- ls -la model_files/ | grep -E "\.tmp|\..*[A-Za-z0-9]{8}$" + +# Monitor endpoint builder +kubectl logs deployment/model-engine-endpoint-builder -n llm-core --tail=20 +``` + +--- + +## 🚨 Known Issues & Future Work + +### **Critical Unresolved Issues** + +#### **1. PVC Functionality Broken** +- **Status**: All attempts to use PVC storage fail +- **Root Cause**: Async hardware config bug in appcode +- **Error**: `RuntimeWarning: coroutine '_get_recommended_hardware_config_map' was never awaited` +- **Impact**: Always falls back to EmptyDir instead of PVC +- **Workaround**: Using ephemeral storage with reduced limits +- **PVC Code Status**: PVC implementation has been **reverted from both repositories** and is **scheduled for rework next week** +- **Fix Required**: Changes to `llm-engine` repository to properly await async hardware config function + +#### **2. Storage Scaling Limitations** +- **Current**: Single GPU node with 189GB ephemeral storage +- **Constraint**: Large models require more storage than available +- **Options**: Add GPU nodes, expand node storage, or implement working PVC + +#### **3. Download Performance** +- **Current**: ~4MB/s download speeds from Scality S3 +- **Optimization**: Could pre-install AWS CLI in base images +- **Alternative**: Use faster download tools or local mirrors + +### **Prevention Guidelines** +- **Always use 50GB storage** for tesla-t4 hardware (not 100GB+) +- **Always use `vllm-onprem` tag** (not version-specific like `0.6.3-rc1`) +- **Always include `--no-cache-dir`** in AWS CLI installation commands +- **Test endpoint creation** immediately after any image updates +- **Monitor container uptime** - quick termination indicates problems + +--- + +## 📁 Critical File Locations + +### **oman-national-llm Repository** +``` +infra/charts/model-engine/ +├── values.yaml # Main configuration +├── templates/ +│ ├── service_template_config_map.yaml # Pod/deployment templates +│ ├── recommended_hardware_config_map.yaml # Hardware specifications +│ ├── service_config_map.yaml # Service configuration +│ └── _helpers.tpl # Helm helper functions +``` + + + +--- + +## 🚀 Quick Reference + +### **Working API Call** +```bash +curl -X POST -H "Content-Type: application/json" -u "test-user-id:" "http://localhost:5000/v1/llm/model-endpoints" -d '{ + "name": "test-endpoint-v1", + "model_name": "test-model", + "endpoint_type": "streaming", + "inference_framework": "vllm", + "inference_framework_image_tag": "vllm-onprem", + "source": "hugging_face", + "checkpoint_path": "s3://scale-gp-models/intermediate-model-aws/", + "num_shards": 1, + "cpus": 4, + "memory": "16Gi", + "storage": "50Gi", # CRITICAL: Must be 50Gi or less + "gpus": 1, + "gpu_type": "nvidia-tesla-t4", + "nodes_per_worker": 1, + "min_workers": 1, + "max_workers": 1, + "per_worker": 1, + "metadata": {"team": "test", "product": "llm-engine"}, + "labels": {"team": "test", "product": "llm-engine"} +}' +``` + +### **Emergency Revert Procedure** +```bash +# Revert to last working state +kubectl set image deployment/model-engine model-engine=registry.odp.om/odp-development/oman-national-llm/model-engine:onprem20 -n llm-core +kubectl set image deployment/model-engine-endpoint-builder model-engine-endpoint-builder=registry.odp.om/odp-development/oman-national-llm/model-engine:onprem20 -n llm-core + +# Update values.yaml +tag: onprem20 +vllm_tag: "vllm-onprem" + +# Verify storage configuration +storage: 50Gi # In hardware specs +``` + + + +--- + +*This documentation represents the culmination of extensive testing and debugging to achieve the first stable model-engine deployment. Preserve this configuration as the baseline for future development.* diff --git a/charts/model-engine/templates/_helpers.tpl b/charts/model-engine/templates/_helpers.tpl index a8de80c67..181281eeb 100644 --- a/charts/model-engine/templates/_helpers.tpl +++ b/charts/model-engine/templates/_helpers.tpl @@ -369,6 +369,18 @@ env: - name: CIRCLECI value: "true" {{- end }} + {{- if .Values.gunicorn }} + - name: WORKER_TIMEOUT + value: {{ .Values.gunicorn.workerTimeout | quote }} + - name: GUNICORN_TIMEOUT + value: {{ .Values.gunicorn.gracefulTimeout | quote }} + - name: GUNICORN_GRACEFUL_TIMEOUT + value: {{ .Values.gunicorn.gracefulTimeout | quote }} + - name: GUNICORN_KEEP_ALIVE + value: {{ .Values.gunicorn.keepAlive | quote }} + - name: GUNICORN_WORKER_CLASS + value: {{ .Values.gunicorn.workerClass | quote }} + {{- end }} {{- end }} {{- define "modelEngine.serviceEnvGitTagFromHelmVar" }} diff --git a/model-engine/Dockerfile b/model-engine/Dockerfile index 45cd9630d..e6ad03be5 100644 --- a/model-engine/Dockerfile +++ b/model-engine/Dockerfile @@ -43,6 +43,7 @@ COPY model-engine/requirements.txt /workspace/model-engine/requirements.txt COPY model-engine/requirements_override.txt /workspace/model-engine/requirements_override.txt RUN pip install -r requirements-test.txt --no-cache-dir RUN pip install -r requirements.txt --no-cache-dir +RUN pip install torch==2.2.2+cpu --index-url https://download.pytorch.org/whl/cpu --no-cache-dir RUN pip install -r requirements_override.txt --no-cache-dir COPY model-engine/setup.py /workspace/model-engine/setup.py COPY model-engine/model_engine_server /workspace/model-engine/model_engine_server diff --git a/model-engine/model_engine_server/common/config.py b/model-engine/model_engine_server/common/config.py index 532ead21a..e8775df77 100644 --- a/model-engine/model_engine_server/common/config.py +++ b/model-engine/model_engine_server/common/config.py @@ -77,6 +77,12 @@ class HostedModelInferenceServiceConfig: None # Not an env var because the redis cache info is already here ) sglang_repository: Optional[str] = None + # Image tags for onprem deployments + vllm_tag: Optional[str] = None + tgi_tag: Optional[str] = None + lightllm_tag: Optional[str] = None + tensorrt_llm_tag: Optional[str] = None + batch_inference_vllm_tag: Optional[str] = None @classmethod def from_json(cls, json): @@ -90,6 +96,11 @@ def from_yaml(cls, yaml_path): @property def cache_redis_url(self) -> str: + # First priority: Check for CACHE_REDIS_URL environment variable (injected by Helm) + cache_redis_url_env = os.getenv("CACHE_REDIS_URL") + if cache_redis_url_env: + return cache_redis_url_env + if self.cache_redis_aws_url: assert infra_config().cloud_provider == "aws", "cache_redis_aws_url is only for AWS" if self.cache_redis_aws_secret_name: @@ -104,6 +115,19 @@ def cache_redis_url(self) -> str: creds = get_key_file(self.cache_redis_aws_secret_name) # Use default role return creds["cache-url"] + # Check if we're in an onprem environment with direct Redis access via config + if infra_config().cloud_provider == "onprem" and infra_config().redis_host: + # Onprem Redis configuration + redis_host = infra_config().redis_host + redis_port = infra_config().redis_port + redis_password = infra_config().redis_password + + if redis_password: + return f"redis://:{redis_password}@{redis_host}:{redis_port}/0" + else: + return f"redis://{redis_host}:{redis_port}/0" + + # Azure Redis configuration (existing logic) assert self.cache_redis_azure_host and infra_config().cloud_provider == "azure" username = os.getenv("AZURE_OBJECT_ID") token = DefaultAzureCredential().get_token("https://redis.azure.com/.default") diff --git a/model-engine/model_engine_server/core/aws/roles.py b/model-engine/model_engine_server/core/aws/roles.py index d33efecae..670ff3a1f 100644 --- a/model-engine/model_engine_server/core/aws/roles.py +++ b/model-engine/model_engine_server/core/aws/roles.py @@ -12,6 +12,7 @@ from boto3 import Session, client from botocore.client import BaseClient from model_engine_server.core.loggers import logger_name, make_logger +from model_engine_server.core.config import infra_config logger = make_logger(logger_name()) diff --git a/model-engine/model_engine_server/core/aws/secrets.py b/model-engine/model_engine_server/core/aws/secrets.py index 0637b121b..8fbb05bcd 100644 --- a/model-engine/model_engine_server/core/aws/secrets.py +++ b/model-engine/model_engine_server/core/aws/secrets.py @@ -1,6 +1,7 @@ """AWS secrets module.""" import json +import os from functools import lru_cache from typing import Optional @@ -14,17 +15,20 @@ @lru_cache(maxsize=2) def get_key_file(secret_name: str, aws_profile: Optional[str] = None): - if aws_profile is not None: - session = boto3.Session(profile_name=aws_profile) - secret_manager = session.client("secretsmanager", region_name=infra_config().default_region) - else: - secret_manager = boto3.client("secretsmanager", region_name=infra_config().default_region) + # Only use AWS Secrets Manager for AWS cloud provider + if infra_config().cloud_provider != "aws": + logger.warning(f"Not using AWS Secrets Manager - cloud provider is {infra_config().cloud_provider} (cannot retrieve secret: {secret_name})") + return {} + try: - secret_value = json.loads( - secret_manager.get_secret_value(SecretId=secret_name)["SecretString"] - ) - return secret_value - except ClientError as e: - logger.error(e) - logger.error(f"Failed to retrieve secret: {secret_name}") + if aws_profile is not None: + session = boto3.Session(profile_name=aws_profile) + secret_manager = session.client("secretsmanager", region_name=infra_config().default_region) + else: + secret_manager = boto3.client("secretsmanager", region_name=infra_config().default_region) + + response = secret_manager.get_secret_value(SecretId=secret_name) + return json.loads(response["SecretString"]) + except (ClientError, Exception) as e: + logger.warning(f"Failed to retrieve secret {secret_name} from AWS Secrets Manager: {e}") return {} diff --git a/model-engine/model_engine_server/core/aws/storage_client.py b/model-engine/model_engine_server/core/aws/storage_client.py index 814b00c4e..707b5ce09 100644 --- a/model-engine/model_engine_server/core/aws/storage_client.py +++ b/model-engine/model_engine_server/core/aws/storage_client.py @@ -1,6 +1,8 @@ +import os import time from typing import IO, Callable, Iterable, Optional, Sequence +import boto3 import smart_open from botocore.client import BaseClient from model_engine_server.core.aws.roles import session @@ -20,7 +22,21 @@ def sync_storage_client(**kwargs) -> BaseClient: - return session(infra_config().profile_ml_worker).client("s3", **kwargs) # type: ignore + # Support custom endpoints for S3-compatible storage + endpoint_url = kwargs.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL") + if endpoint_url and "endpoint_url" not in kwargs: + kwargs["endpoint_url"] = endpoint_url + + if infra_config().cloud_provider == "onprem": + # For onprem, use explicit credentials from environment variables + boto3_session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + return boto3_session.client("s3", **kwargs) + else: + return session(infra_config().profile_ml_worker).client("s3", **kwargs) # type: ignore def open(uri: str, mode: str = "rt", **kwargs) -> IO: # pylint: disable=redefined-builtin diff --git a/model-engine/model_engine_server/core/celery/app.py b/model-engine/model_engine_server/core/celery/app.py index af7790d1e..4b1be767f 100644 --- a/model-engine/model_engine_server/core/celery/app.py +++ b/model-engine/model_engine_server/core/celery/app.py @@ -530,18 +530,26 @@ def _get_backend_url_and_conf( # use db_num=1 for backend to differentiate from broker backend_url = get_redis_endpoint(1) elif backend_protocol == "s3": - backend_url = "s3://" - if aws_role is None: - aws_session = session(infra_config().profile_ml_worker) + # For on-premises environments, use Redis backend instead of S3 + if infra_config().cloud_provider == "onprem": + logger.info("Using Redis backend for on-premises environment instead of S3") + backend_url = get_redis_endpoint(1) + elif infra_config().cloud_provider != "aws": + raise ValueError(f"S3 backend requires AWS cloud provider, but current provider is {infra_config().cloud_provider}") else: - aws_session = session(aws_role) - out_conf_changes.update( - { - "s3_boto3_session": aws_session, - "s3_bucket": s3_bucket, - "s3_base_path": s3_base_path, - } - ) + backend_url = "s3://" + if aws_role is None: + aws_session = session(infra_config().profile_ml_worker) + else: + aws_session = session(aws_role) + + out_conf_changes.update( + { + "s3_boto3_session": aws_session, + "s3_bucket": s3_bucket, + "s3_base_path": s3_base_path, + } + ) elif backend_protocol == "abs": backend_url = f"azureblockblob://{os.getenv('ABS_ACCOUNT_NAME')}" else: diff --git a/model-engine/model_engine_server/core/config.py b/model-engine/model_engine_server/core/config.py index 0474976c0..a4e721525 100644 --- a/model-engine/model_engine_server/core/config.py +++ b/model-engine/model_engine_server/core/config.py @@ -40,8 +40,11 @@ class _InfraConfig: default_region: str ml_account_id: str docker_repo_prefix: str - s3_bucket: str + s3_bucket: Optional[str] = None # Optional for on-premises + aws_endpoint_url: Optional[str] = None # Optional for on-premises S3 redis_host: Optional[str] = None + redis_port: Optional[str] = "6379" + redis_password: Optional[str] = None redis_aws_secret_name: Optional[str] = None profile_ml_worker: str = "default" profile_ml_inference_worker: str = "default" @@ -64,6 +67,16 @@ class DBEngineConfig: class InfraConfig(DBEngineConfig, _InfraConfig): @classmethod def from_json(cls, json): + # Handle missing AWS parameters for on-premises environments + if json.get("cloud_provider") == "onprem": + # Set default values for AWS-specific fields when they're missing + if "s3_bucket" not in json: + json["s3_bucket"] = None + if "ml_account_id" not in json: + json["ml_account_id"] = "000000000000" + if "default_region" not in json: + json["default_region"] = "local" + return cls(**{k: v for k, v in json.items() if k in inspect.signature(cls).parameters}) @classmethod diff --git a/model-engine/model_engine_server/core/configs/default.yaml b/model-engine/model_engine_server/core/configs/default.yaml index 2e2e6ec08..c2d1019ff 100644 --- a/model-engine/model_engine_server/core/configs/default.yaml +++ b/model-engine/model_engine_server/core/configs/default.yaml @@ -6,6 +6,8 @@ default_region: "us-west-2" ml_account_id: "000000000000" docker_repo_prefix: "000000000000.dkr.ecr.us-west-2.amazonaws.com" redis_host: "redis-message-broker-master.default" +redis_port: "6379" +redis_password: null s3_bucket: "test-bucket" profile_ml_worker: "default" profile_ml_inference_worker: "default" diff --git a/model-engine/model_engine_server/core/docker/docker_image.py b/model-engine/model_engine_server/core/docker/docker_image.py index 8d68f8c8b..9926de519 100644 --- a/model-engine/model_engine_server/core/docker/docker_image.py +++ b/model-engine/model_engine_server/core/docker/docker_image.py @@ -185,19 +185,29 @@ def push(service_name: str, image_tag: Optional[str] = None) -> None: logger.info(f"push args: {local_args}") docker_client = docker.from_env() - ecr_client = boto3.client("ecr", region_name=infra_config().default_region) - token = ecr_client.get_authorization_token(registryIds=[infra_config().ml_account_id]) - username, password = ( - base64.b64decode(token["authorizationData"][0]["authorizationToken"]).decode().split(":") - ) + if infra_config().cloud_provider == "onprem": + # For onprem, push without ECR authentication + output = docker_client.images.push( + repository=f"{infra_config().docker_repo_prefix}/{service_name}", + tag=_get_image_tag(image_tag), + stream=True, + decode=True, + ) + else: + # For AWS, use ECR authentication + ecr_client = boto3.client("ecr", region_name=infra_config().default_region) + token = ecr_client.get_authorization_token(registryIds=[infra_config().ml_account_id]) + username, password = ( + base64.b64decode(token["authorizationData"][0]["authorizationToken"]).decode().split(":") + ) - output = docker_client.images.push( - repository=f"{infra_config().docker_repo_prefix}/{service_name}", - tag=_get_image_tag(image_tag), - auth_config={"username": username, "password": password}, - stream=True, - decode=True, - ) + output = docker_client.images.push( + repository=f"{infra_config().docker_repo_prefix}/{service_name}", + tag=_get_image_tag(image_tag), + auth_config={"username": username, "password": password}, + stream=True, + decode=True, + ) for line in output: logger.info(line) diff --git a/model-engine/model_engine_server/core/docker/ecr.py b/model-engine/model_engine_server/core/docker/ecr.py index fcd324b9e..282fb4f9b 100644 --- a/model-engine/model_engine_server/core/docker/ecr.py +++ b/model-engine/model_engine_server/core/docker/ecr.py @@ -8,6 +8,8 @@ def repository_exists(repository_name: str): + if infra_config().cloud_provider == "onprem": + return True ecr = boto3.client("ecr", region_name=infra_config().default_region) try: response = ecr.describe_repositories( @@ -37,6 +39,9 @@ def batch_image_exists( if filter is None: filter = DEFAULT_FILTER + if infra_config().cloud_provider == "onprem": + return True + if aws_profile is None: client = boto3.client("ecr", region_name=region_name) else: @@ -85,6 +90,8 @@ def image_exists( def ecr_exists_for_repo(repo_name: str, image_tag: Optional[str] = None): """Check if image exists in ECR""" + if infra_config().cloud_provider == "onprem": + return True if image_tag is None: image_tag = tag() ecr = boto3.client("ecr", region_name=infra_config().default_region) @@ -100,6 +107,31 @@ def ecr_exists_for_repo(repo_name: str, image_tag: Optional[str] = None): def get_latest_image_tag(repository_name: str): + if infra_config().cloud_provider == "onprem": + # For onprem, try to find explicitly configured tags first + from model_engine_server.common.config import hmi_config + + # Map repository names to config tag properties + # Note: This requires adding tag properties to HostedModelInferenceServiceConfig + tag_mapping = { + hmi_config.vllm_repository: getattr(hmi_config, 'vllm_tag', None), + hmi_config.tgi_repository: getattr(hmi_config, 'tgi_tag', None), + hmi_config.lightllm_repository: getattr(hmi_config, 'lightllm_tag', None), + hmi_config.tensorrt_llm_repository: getattr(hmi_config, 'tensorrt_llm_tag', None), + hmi_config.batch_inference_vllm_repository: getattr(hmi_config, 'batch_inference_vllm_tag', None), + } + + # Check if we have an explicit tag for this repository + explicit_tag = tag_mapping.get(repository_name) + if explicit_tag: + return explicit_tag + + # If no explicit tag found, provide helpful error + raise NotImplementedError( + f"No explicit tag configured for repository '{repository_name}'. " + f"For on-premises deployments, please add '{repository_name.replace('/', '_')}_tag' " + f"to your values file configuration, or specify image tags explicitly in your deployment." + ) ecr = boto3.client("ecr", region_name=infra_config().default_region) images = ecr.describe_images( registryId=infra_config().ml_account_id, diff --git a/model-engine/model_engine_server/core/loggers.py b/model-engine/model_engine_server/core/loggers.py index 3a28d4501..9fa2f6d00 100644 --- a/model-engine/model_engine_server/core/loggers.py +++ b/model-engine/model_engine_server/core/loggers.py @@ -80,9 +80,24 @@ def make_standard_logger(name: str, log_level: int = logging.INFO) -> logging.Lo raise ValueError("Name must be a non-empty string.") logger = logging.getLogger(name) logger.setLevel(log_level) - logging.basicConfig( - format=LOG_FORMAT, - ) + + # Thread-safe logging configuration - only configure if not already configured + if not logger.handlers: + # Use a lock to prevent race conditions in multi-threaded environments + import threading + with threading.Lock(): + if not logger.handlers: # Double-check after acquiring lock + # Configure basic logging only if not already configured + if not logging.getLogger().handlers: + logging.basicConfig( + format=LOG_FORMAT, + ) + # Add handler to this specific logger if needed + if not logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(LOG_FORMAT)) + logger.addHandler(handler) + return logger diff --git a/model-engine/model_engine_server/db/base.py b/model-engine/model_engine_server/db/base.py index 5033d8ada..053d217f5 100644 --- a/model-engine/model_engine_server/db/base.py +++ b/model-engine/model_engine_server/db/base.py @@ -22,6 +22,9 @@ def get_key_file_name(environment: str) -> str: if infra_config().cloud_provider == "azure": return f"{environment}-ml-infra-pg".replace("training", "prod").replace("-new", "") + elif infra_config().cloud_provider == "onprem": + # For on-premises, use a local database configuration + return f"local/{environment}/ml_infra_pg".replace("training", "prod").replace("-new", "") return f"{environment}/ml_infra_pg".replace("training", "prod").replace("-new", "") @@ -76,6 +79,17 @@ def get_engine_url( # for recommendations on how to work with rotating auth credentials engine_url = f"postgresql://{user}:{password}@{db}?sslmode=require" expiry_in_sec = token.expires_on + elif infra_config().cloud_provider == "onprem": + # For on-premises environments, use local database configuration + # These should be set in environment variables for on-premises deployments + host = os.environ.get("DB_HOST", "localhost") + port = os.environ.get("DB_PORT", "5432") + dbname = os.environ.get("DB_NAME", "model_engine") + user = os.environ.get("DB_USER", "postgres") + password = os.environ.get("DB_PASSWORD", "") + + logger.info(f"Connecting to local db {host}:{port}, name {dbname}") + engine_url = f"postgresql://{user}:{password}@{host}:{port}/{dbname}" else: db_secret_aws_profile = os.environ.get("DB_SECRET_AWS_PROFILE") creds = get_key_file(key_file, db_secret_aws_profile) diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index e4572f614..260dd7ca3 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -531,6 +531,7 @@ async def create_text_generation_inference_bundle( framework_image_tag, checkpoint_path, final_weights_folder, + model_name=model_name, ) subcommands.append( @@ -580,6 +581,7 @@ def load_model_weights_sub_commands( checkpoint_path, final_weights_folder, trust_remote_code: bool = False, + model_name: str = "model", ): if checkpoint_path.startswith("s3://"): return self.load_model_weights_sub_commands_s3( @@ -588,6 +590,7 @@ def load_model_weights_sub_commands( checkpoint_path, final_weights_folder, trust_remote_code, + model_name, ) elif checkpoint_path.startswith("azure://") or "blob.core.windows.net" in checkpoint_path: return self.load_model_weights_sub_commands_abs( @@ -596,6 +599,7 @@ def load_model_weights_sub_commands( checkpoint_path, final_weights_folder, trust_remote_code, + model_name, ) else: raise ObjectHasInvalidValueException( @@ -609,32 +613,29 @@ def load_model_weights_sub_commands_s3( checkpoint_path, final_weights_folder, trust_remote_code: bool, + model_name: str = "model", ): subcommands = [] - s5cmd = "s5cmd" - - # This is a hack for now to skip installing s5cmd for text-generation-inference:0.9.3-launch_s3, - # which has s5cmd binary already baked in. Otherwise, install s5cmd if it's not already available - if ( - framework == LLMInferenceFramework.TEXT_GENERATION_INFERENCE - and framework_image_tag != "0.9.3-launch_s3" - ): - subcommands.append(f"{s5cmd} > /dev/null || conda install -c conda-forge -y {s5cmd}") - else: - s5cmd = "./s5cmd" checkpoint_files = self.llm_artifact_gateway.list_files(checkpoint_path) validate_checkpoint_files(checkpoint_files) - # filter to configs ('*.model' and '*.json') and weights ('*.safetensors') - # For models that are not supported by transformers directly, we need to include '*.py' and '*.bin' - # to load the model. Only set this flag if "trust_remote_code" is set to True - file_selection_str = '--include "*.model" --include "*.model.v*" --include "*.json" --include "*.safetensors" --exclude "optimizer*"' - if trust_remote_code: - file_selection_str += ' --include "*.py"' - subcommands.append( - f"{s5cmd} --numworkers 512 cp --concurrency 10 {file_selection_str} {os.path.join(checkpoint_path, '*')} {final_weights_folder}" - ) + # Support for third-party object storage (like Scality) + # Use AWS CLI as proven in working deployment + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + + if endpoint_url: + # For custom endpoints (like Scality), use pre-installed AWS CLI v2 + subcommands.extend([ + f"AWS_ACCESS_KEY_ID={os.getenv('AWS_ACCESS_KEY_ID', '')} AWS_SECRET_ACCESS_KEY={os.getenv('AWS_SECRET_ACCESS_KEY', '')} AWS_ENDPOINT_URL={endpoint_url} AWS_REGION={os.getenv('AWS_REGION', 'us-east-1')} AWS_EC2_METADATA_DISABLED=true aws s3 sync {checkpoint_path.rstrip('/')} {final_weights_folder} --no-progress", + f"echo 'Waiting for AWS CLI to finalize files...' ; sleep 30 ; echo 'Checking for model files...' ; while [ ! -f {final_weights_folder}/config.json ] || ! ls {final_weights_folder}/*.safetensors >/dev/null 2>&1 ; do echo 'Files not ready yet, waiting 30 more seconds...' ; sleep 30 ; done ; echo 'Model files are ready!' ; ls -la {final_weights_folder}/ ; echo 'VLLM can now start'" + ]) + else: + # Standard S3, use pre-installed AWS CLI v2 + subcommands.extend([ + f"aws s3 sync {checkpoint_path.rstrip('/')} {final_weights_folder}", + f"echo 'Waiting for AWS CLI to finalize files...' ; sleep 30 ; echo 'Checking for model files...' ; while [ ! -f {final_weights_folder}/config.json ] || ! ls {final_weights_folder}/*.safetensors >/dev/null 2>&1 ; do echo 'Files not ready yet, waiting 30 more seconds...' ; sleep 30 ; done ; echo 'Model files are ready!' ; ls -la {final_weights_folder}/ ; echo 'VLLM can now start'" + ]) return subcommands def load_model_weights_sub_commands_abs( @@ -644,6 +645,7 @@ def load_model_weights_sub_commands_abs( checkpoint_path, final_weights_folder, trust_remote_code: bool, + model_name: str = "model", ): subcommands = [] @@ -685,9 +687,37 @@ def load_model_files_sub_commands_trt_llm( and llm-engine/model-engine/model_engine_server/inference/tensorrt-llm/triton_model_repo/postprocessing/config.pbtxt """ if checkpoint_path.startswith("s3://"): - subcommands = [ - f"./s5cmd --numworkers 512 cp --concurrency 50 {os.path.join(checkpoint_path, '*')} ./" - ] + # Support for third-party object storage (like Scality) + # Use azcopy instead of AWS CLI since container doesn't have pip + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + + # Download and install azcopy (works in any Linux container) + subcommands.extend([ + "curl -L https://aka.ms/downloadazcopy-v10-linux | tar --strip-components=1 -C /usr/local/bin --no-same-owner --exclude=*.txt -xzvf - && chmod 755 /usr/local/bin/azcopy", + ]) + + if endpoint_url: + # For custom endpoints (like Scality), use azcopy with S3-compatible mode + subcommands.extend([ + f"export AZCOPY_AUTO_LOGIN_TYPE=WORKLOAD", + f"export AWS_ACCESS_KEY_ID={os.getenv('AWS_ACCESS_KEY_ID', '')}", + f"export AWS_SECRET_ACCESS_KEY={os.getenv('AWS_SECRET_ACCESS_KEY', '')}", + f"export AWS_ENDPOINT_URL={endpoint_url}", + f"export AWS_REGION={os.getenv('AWS_REGION', 'us-east-1')}", + f"export AWS_EC2_METADATA_DISABLED=true", + # Use azcopy with S3-compatible endpoint + f"azcopy copy --recursive --include-pattern '*.json;*.safetensors;*.model' --exclude-pattern 'optimizer*' '{checkpoint_path}/' ./" + ]) + else: + # Standard S3, use azcopy with AWS credentials + subcommands.extend([ + f"export AZCOPY_AUTO_LOGIN_TYPE=WORKLOAD", + f"export AWS_ACCESS_KEY_ID={os.getenv('AWS_SECRET_ACCESS_KEY', '')}", + f"export AWS_SECRET_ACCESS_KEY={os.getenv('AWS_SECRET_ACCESS_KEY', '')}", + f"export AWS_REGION={os.getenv('AWS_REGION', 'us-east-1')}", + # Use azcopy for standard S3 + f"azcopy copy --recursive --include-pattern '*.json;*.safetensors;*.model' --exclude-pattern 'optimizer*' '{checkpoint_path}/' ./" + ]) else: subcommands.extend( [ @@ -769,9 +799,10 @@ async def create_deepspeed_bundle( "MODEL_NAME": model_name, }, protocol="http", - readiness_initial_delay_seconds=1800, + readiness_initial_delay_seconds=120, ), metadata={}, + ), do_auth_check=False, ) @@ -805,7 +836,7 @@ async def create_sglang_bundle( # pragma: no cover huggingface_repo = sglang_args.huggingface_repo sglang_args.huggingface_repo = None # remove from additional_args - # TODO(dmchoi): currently using official sglang image; doesn't have s5cmd + # TODO(dmchoi): currently using official sglang image; uses AWS CLI for downloads # final_weights_folder = "model_files" # subcommands += self.load_model_weights_sub_commands( # LLMInferenceFramework.SGLANG, @@ -920,6 +951,7 @@ def _create_vllm_bundle_command( checkpoint_path, final_weights_folder, trust_remote_code=vllm_args.trust_remote_code or False, + model_name=model_name, ) if multinode: @@ -952,7 +984,8 @@ def _create_vllm_bundle_command( if hmi_config.sensitive_log_mode: vllm_args.disable_log_requests = True - vllm_cmd = f"python -m vllm_server --model {final_weights_folder} --served-model-name {model_name} {final_weights_folder} --port 5005" + # TEMP FIX: Using vllm_adapter.py for SGP /predict endpoint compatibility until SGP is updated to use V2 + vllm_cmd = f"python vllm_adapter.py --model {final_weights_folder} --served-model-name {model_name} --port 5005" for field in VLLMEndpointAdditionalArgs.model_fields.keys(): config_value = getattr(vllm_args, field, None) if config_value is not None: @@ -970,7 +1003,9 @@ def _create_vllm_bundle_command( else: vllm_cmd += f" --{field.replace('_', '-')} {config_value}" - subcommands.append(vllm_cmd) + # Add fallback to keep container running for debugging if VLLM fails + vllm_cmd_with_fallback = f"{vllm_cmd} || (echo 'VLLM failed to start, keeping container alive for debugging...' ; sleep infinity)" + subcommands.append(vllm_cmd_with_fallback) command = [ "/bin/bash", @@ -1015,8 +1050,8 @@ async def create_vllm_bundle( command=command, streaming_command=command, protocol="http", - readiness_initial_delay_seconds=10, - healthcheck_route="/health", + readiness_initial_delay_seconds=120, # 30 minutes for large model downloads + healthcheck_route="/health", # Standard health check route predict_route="/predict", streaming_predict_route="/stream", extra_routes=[ @@ -1154,6 +1189,7 @@ async def create_lightllm_bundle( framework_image_tag, checkpoint_path, final_weights_folder, + model_name=model_name, ) subcommands.append( diff --git a/model-engine/model_engine_server/entrypoints/k8s_cache.py b/model-engine/model_engine_server/entrypoints/k8s_cache.py index 98dcd9b35..b50d3b387 100644 --- a/model-engine/model_engine_server/entrypoints/k8s_cache.py +++ b/model-engine/model_engine_server/entrypoints/k8s_cache.py @@ -124,6 +124,8 @@ async def main(args: Any): docker_repo: DockerRepository if CIRCLECI: docker_repo = FakeDockerRepository() + elif infra_config().cloud_provider == "onprem": + docker_repo = FakeDockerRepository() elif infra_config().docker_repo_prefix.endswith("azurecr.io"): docker_repo = ACRDockerRepository() else: diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index 26972454c..660dd2046 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -17,8 +17,11 @@ ABSFilesystemGateway, ASBInferenceAutoscalingMetricsGateway, CeleryTaskQueueGateway, + FakeQueueEndpointResourceDelegate, LiveAsyncModelEndpointInferenceGateway, + LiveBatchJobOrchestrationGateway, LiveBatchJobProgressGateway, + LiveEndpointResourceGateway, LiveModelEndpointInfraGateway, LiveModelEndpointsSchemaGateway, LiveStreamingModelEndpointInferenceGateway, @@ -26,6 +29,7 @@ RedisInferenceAutoscalingMetricsGateway, S3FilesystemGateway, ) + from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) @@ -69,6 +73,9 @@ async def run_batch_job( servicebus_task_queue_gateway = CeleryTaskQueueGateway( broker_type=BrokerType.SERVICEBUS, tracing_gateway=tracing_gateway ) + redis_task_queue_gateway = CeleryTaskQueueGateway( + broker_type=BrokerType.REDIS, tracing_gateway=tracing_gateway + ) monitoring_metrics_gateway = get_monitoring_metrics_gateway() model_endpoint_record_repo = DbModelEndpointRecordRepository( @@ -80,6 +87,9 @@ async def run_batch_job( queue_delegate = FakeQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() + elif infra_config().cloud_provider == "onprem": + # For on-premises environments, use fake queue delegate since cloud queues are not available + queue_delegate = FakeQueueEndpointResourceDelegate() else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) @@ -100,6 +110,10 @@ async def run_batch_job( if infra_config().cloud_provider == "azure": inference_task_queue_gateway = servicebus_task_queue_gateway infra_task_queue_gateway = servicebus_task_queue_gateway + elif infra_config().cloud_provider == "onprem": + # For on-premises environments, use Redis task queue gateway + inference_task_queue_gateway = redis_task_queue_gateway + infra_task_queue_gateway = redis_task_queue_gateway else: inference_task_queue_gateway = sqs_task_queue_gateway infra_task_queue_gateway = sqs_task_queue_gateway diff --git a/model-engine/model_engine_server/entrypoints/start_fastapi_server.py b/model-engine/model_engine_server/entrypoints/start_fastapi_server.py index c7a38e2ca..1595a58db 100644 --- a/model-engine/model_engine_server/entrypoints/start_fastapi_server.py +++ b/model-engine/model_engine_server/entrypoints/start_fastapi_server.py @@ -5,6 +5,7 @@ """ import argparse +import os import subprocess from typing import List @@ -14,18 +15,25 @@ def start_gunicorn_server(port: int, num_workers: int, debug: bool) -> None: additional_args: List[str] = [] if debug: additional_args.extend(["--reload", "--timeout", "0"]) + + # Use environment variables for configuration with fallbacks + timeout = int(os.environ.get('WORKER_TIMEOUT', os.environ.get('GUNICORN_TIMEOUT', 60))) + graceful_timeout = int(os.environ.get('GUNICORN_GRACEFUL_TIMEOUT', timeout)) + keep_alive = int(os.environ.get('GUNICORN_KEEP_ALIVE', 2)) + worker_class = os.environ.get('GUNICORN_WORKER_CLASS', 'model_engine_server.api.worker.LaunchWorker') + command = [ "gunicorn", "--bind", f"[::]:{port}", "--timeout", - "60", + str(timeout), "--graceful-timeout", - "60", + str(graceful_timeout), "--keep-alive", - "2", + str(keep_alive), "--worker-class", - "model_engine_server.api.worker.LaunchWorker", + worker_class, "--workers", f"{num_workers}", *additional_args, diff --git a/model-engine/model_engine_server/inference/batch_inference/vllm_batch.py b/model-engine/model_engine_server/inference/batch_inference/vllm_batch.py index 0214b2c44..82a123c6d 100644 --- a/model-engine/model_engine_server/inference/batch_inference/vllm_batch.py +++ b/model-engine/model_engine_server/inference/batch_inference/vllm_batch.py @@ -56,37 +56,119 @@ def get_cpu_cores_in_container(): def get_s3_client(): - session = boto3.Session(profile_name=os.getenv("S3_WRITE_AWS_PROFILE")) - return session.client("s3", region_name=AWS_REGION) + from model_engine_server.core.config import infra_config + + if infra_config().cloud_provider == "onprem": + # For onprem, use explicit credentials from environment variables + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + else: + session = boto3.Session(profile_name=os.getenv("S3_WRITE_AWS_PROFILE")) + + # Support custom endpoints for S3-compatible storage + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + client_kwargs = {"region_name": infra_config().default_region} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + + return session.client("s3", **client_kwargs) def download_model(checkpoint_path, final_weights_folder): - s5cmd = f"./s5cmd --numworkers 512 sync --concurrency 10 --include '*.model' --include '*.json' --include '*.bin' --include '*.safetensors' --exclude 'optimizer*' --exclude 'train*' {os.path.join(checkpoint_path, '*')} {final_weights_folder}" - env = os.environ.copy() - env["AWS_PROFILE"] = os.getenv("S3_WRITE_AWS_PROFILE", "default") - # Need to override these env vars so s5cmd uses AWS_PROFILE - env["AWS_ROLE_ARN"] = "" - env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" - process = subprocess.Popen( - s5cmd, - shell=True, # nosemgrep + # Support for third-party object storage (like Scality) + # AWS CLI works with Scality - so we install and use AWS CLI + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + + # Install AWS CLI first (since it's not in the VLLM container by default) + print("Installing AWS CLI...", flush=True) + install_process = subprocess.Popen( + ["pip", "install", "awscli"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - env=env, ) - for line in process.stdout: - print(line, flush=True) - - process.wait() - - if process.returncode != 0: - stderr_lines = [] - for line in iter(process.stderr.readline, ""): - stderr_lines.append(line.strip()) - - print(f"Error downloading model weights: {stderr_lines}", flush=True) - + install_process.wait() + + if install_process.returncode != 0: + print("Failed to install AWS CLI", flush=True) + return + + # Simple approach - download all files (basic AWS CLI v1 compatible) + if endpoint_url: + aws_cmd = f"aws s3 sync {checkpoint_path.rstrip('/')} {final_weights_folder} --endpoint-url {endpoint_url} --no-progress" + else: + aws_cmd = f"aws s3 sync {checkpoint_path.rstrip('/')} {final_weights_folder} --no-progress" + + env = os.environ.copy() + + # Configure credentials for object storage or AWS + if endpoint_url: + # Use object storage credentials when custom endpoint is specified + env["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID", "") + env["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY", "") + env["AWS_ENDPOINT_URL"] = endpoint_url + env["AWS_REGION"] = os.getenv("AWS_REGION", "us-east-1") + # Disable AWS-specific features for third-party object storage + env["AWS_EC2_METADATA_DISABLED"] = "true" + env["AWS_ROLE_ARN"] = "" + env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" + else: + # Use AWS profile for S3 + env["AWS_PROFILE"] = os.getenv("S3_WRITE_AWS_PROFILE", "default") + # Need to override these env vars so AWS CLI uses AWS_PROFILE + env["AWS_ROLE_ARN"] = "" + env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" + + # Retry logic with exponential backoff + max_retries = 3 + retry_delay = 10 # seconds + + for attempt in range(max_retries): + print(f"Running AWS CLI command (attempt {attempt + 1}/{max_retries}): {aws_cmd}", flush=True) + + process = subprocess.Popen( + aws_cmd, + shell=True, # nosemgrep + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=env, + ) + + for line in process.stdout: + print(line, flush=True) + + process.wait() + + if process.returncode == 0: + print("Model download completed successfully!", flush=True) + return + else: + # Handle errors + stderr_lines = [] + for line in iter(process.stderr.readline, ""): + if line.strip(): + stderr_lines.append(line.strip()) + + print(f"Attempt {attempt + 1} failed with return code {process.returncode}", flush=True) + if stderr_lines: + print(f"Error output: {stderr_lines}", flush=True) + + if attempt < max_retries - 1: + print(f"Retrying in {retry_delay} seconds...", flush=True) + import time + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + print(f"All {max_retries} download attempts failed. Keeping container alive for debugging...", flush=True) + # Keep container running for debugging instead of raising error + import time + while True: + print("Container is alive for debugging. Download failed but not exiting.", flush=True) + time.sleep(300) # Print message every 5 minutes def file_exists(path): try: diff --git a/model-engine/model_engine_server/inference/common.py b/model-engine/model_engine_server/inference/common.py index 2f6c1095a..1141a41c5 100644 --- a/model-engine/model_engine_server/inference/common.py +++ b/model-engine/model_engine_server/inference/common.py @@ -25,7 +25,24 @@ def get_s3_client(): global s3_client if s3_client is None: - s3_client = boto3.client("s3", region_name="us-west-2") + from model_engine_server.core.config import infra_config + + if infra_config().cloud_provider == "onprem": + # For onprem, use explicit credentials from environment variables + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + else: + session = boto3.Session(profile_name=os.getenv("AWS_PROFILE")) + + # Support custom endpoints for S3-compatible storage + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + client_kwargs = {"region_name": infra_config().default_region} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + s3_client = session.client("s3", **client_kwargs) return s3_client diff --git a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml index bfdb65536..7316d9e3a 100644 --- a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml +++ b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml @@ -1,6 +1,6 @@ forwarder: sync: - user_port: 5005 + user_port: e 5005 user_hostname: "localhost" use_grpc: false predict_route: "/predict" @@ -9,7 +9,7 @@ forwarder: model_engine_unwrap: true serialize_results_as_string: true forward_http_status: true - extra_routes: [] + extra_routes: ["/v1/chat/completions", "/v1/completions"] stream: user_port: 5005 user_hostname: "localhost" @@ -18,5 +18,5 @@ forwarder: batch_route: null model_engine_unwrap: true serialize_results_as_string: false - extra_routes: [] + extra_routes: ["/v1/chat/completions", "/v1/completions"] max_concurrency: 100 diff --git a/model-engine/model_engine_server/inference/service_requests.py b/model-engine/model_engine_server/inference/service_requests.py index ec1f3ae84..abcbbbc03 100644 --- a/model-engine/model_engine_server/inference/service_requests.py +++ b/model-engine/model_engine_server/inference/service_requests.py @@ -42,7 +42,14 @@ def get_celery(): def get_s3_client(): global s3_client if s3_client is None: - s3_client = boto3.client("s3", region_name="us-west-2") + from model_engine_server.core.config import infra_config + + # Support custom endpoints for S3-compatible storage + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + client_kwargs = {"region_name": infra_config().default_region} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + s3_client = boto3.client("s3", **client_kwargs) return s3_client diff --git a/model-engine/model_engine_server/inference/sglang/sglang-startup-script.py b/model-engine/model_engine_server/inference/sglang/sglang-startup-script.py index 157e9c30c..041fdcb47 100755 --- a/model-engine/model_engine_server/inference/sglang/sglang-startup-script.py +++ b/model-engine/model_engine_server/inference/sglang/sglang-startup-script.py @@ -37,32 +37,31 @@ def main( leader_port: int, s3_path: str, ): - # 1) Download the DeepSeek model using s5cmd + # 1) Download the model using AWS CLI (works with Scality) model_path = f"/data/model_files/{model}" os.makedirs(model_path, exist_ok=True) - s5cmd_cmd = [ - "s5cmd", - "--numworkers=512", - "cp", - "--concurrency=10", - "--include", - "*.model", - "--include", - "*.json", - "--include", - "*.safetensors", - "--include", - "*.py", - "--include", - "tokenizer.model.v*", - "--exclude", - "optimizer*", - f"s3://{s3_path}/{model}/*", - model_path, - ] - print("Running s5cmd download command...") - subprocess.check_call(s5cmd_cmd) + # Install AWS CLI first (since it's not in the SGLang container by default) + print("Installing AWS CLI...") + install_cmd = ["pip", "install", "awscli"] + subprocess.check_call(install_cmd) + print("AWS CLI installation complete.") + + # Build AWS CLI sync command with include/exclude filters + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + # Simple approach - download all files (no filtering) + include_args = [] + exclude_args = [] + + # Simple approach - download all files (no filtering) + + aws_cmd = ["aws", "s3", "sync", f"s3://{s3_path}/{model}/", model_path] + if endpoint_url: + aws_cmd.extend(["--endpoint-url", endpoint_url]) + aws_cmd.extend(["--no-progress", "--max-concurrent-requests", "10", "--multipart-threshold", "100MB", "--multipart-chunksize", "50MB"]) + + print("Running AWS CLI download command...") + subprocess.check_call(aws_cmd) print("Download complete.") # 2) Wait for both the leader and current Pod DNS to resolve diff --git a/model-engine/model_engine_server/inference/vllm/Dockerfile.vllm b/model-engine/model_engine_server/inference/vllm/Dockerfile.vllm index b588439d7..9f1062cfc 100644 --- a/model-engine/model_engine_server/inference/vllm/Dockerfile.vllm +++ b/model-engine/model_engine_server/inference/vllm/Dockerfile.vllm @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1 -ARG VLLM_VERSION=0.6.3 +ARG VLLM_VERSION=0.10.0 ARG VLLM_BASE_REPO=vllm/vllm-openai ARG VLLM_BASE_IMAGE=${VLLM_BASE_REPO}:v${VLLM_VERSION} FROM ${VLLM_BASE_IMAGE} AS base @@ -18,9 +18,21 @@ RUN tar -xvzf s5cmd_2.2.1_Linux-64bit.tar.gz # symlink python to python3 RUN ln -s /usr/bin/python3 /usr/bin/python +# Install AWS CLI v2 for better performance with model downloads +RUN apt-get update && apt-get install -y curl unzip \ + && curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \ + && unzip awscliv2.zip \ + && ./aws/install \ + && rm -rf awscliv2.zip aws \ + && apt-get remove -y unzip \ + && apt-get autoremove -y \ + && rm -rf /var/lib/apt/lists/* \ + && aws --version + FROM base AS vllm COPY model-engine/model_engine_server/inference/vllm/vllm_server.py /workspace/vllm_server.py +COPY model-engine/model_engine_server/inference/vllm/vllm_adapter.py /workspace/vllm_adapter.py COPY model-engine/model_engine_server/inference/vllm/init_ray.sh /workspace/init_ray.sh # Need to override entrypoint from parent image diff --git a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh index 1c5c06f27..48251d4e7 100755 --- a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh +++ b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh @@ -2,48 +2,45 @@ set -eo pipefail -# Build and push vLLM docker image to AWS ECR. +# Build vLLM docker image locally. # -# Usage: VLLM_VERSION=0.6.6.post1 ./build_and_upload_image.sh vllm|vllm_batch|vllm_batch_v2 +# Usage: VLLM_VERSION=0.10.0 ./build_and_upload_image.sh vllm|vllm_batch|vllm_batch_v2 SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) PROJECT_DIR=$SCRIPT_DIR/../../../.. DOCKERFILE=$PROJECT_DIR/model-engine/model_engine_server/inference/vllm/Dockerfile.vllm if [ -z "$1" ]; then - echo "Must supply AWS account ID" - exit 1; -fi - -if [ -z "$2" ]; then echo "Must supply the image tag" exit 1; fi -if [ -z "$3" ]; then +if [ -z "$2" ]; then echo "Must supply the build target (either vllm or vllm_batch_v2)" exit 1; fi - -ACCOUNT=$1 -IMAGE_TAG=$2 -BUILD_TARGET=$3 -VLLM_VERSION=${VLLM_VERSION:-"0.6.6.post1"} +IMAGE_TAG=$1 +BUILD_TARGET=$2 +VLLM_VERSION=${VLLM_VERSION:-"0.10.0"} VLLM_BASE_REPO=${VLLM_BASE_REPO:-"vllm/vllm-openai"} # if build target = vllm use vllm otherwise use vllm_batch if [ "$BUILD_TARGET" == "vllm" ]; then - IMAGE=$ACCOUNT.dkr.ecr.us-west-2.amazonaws.com/vllm:$IMAGE_TAG + IMAGE=vllm-onprem:$IMAGE_TAG else - IMAGE=$ACCOUNT.dkr.ecr.us-west-2.amazonaws.com/llm-engine/batch-infer-vllm:$IMAGE_TAG + IMAGE=vllm-batch-onprem:$IMAGE_TAG fi -aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.us-west-2.amazonaws.com +echo "Building Docker image: $IMAGE" +echo "vLLM Version: $VLLM_VERSION" +echo "Build Target: $BUILD_TARGET" + DOCKER_BUILDKIT=1 docker build \ --build-arg VLLM_VERSION=${VLLM_VERSION} \ --build-arg VLLM_BASE_REPO=${VLLM_BASE_REPO} \ -f ${DOCKERFILE} \ --target ${BUILD_TARGET} \ -t $IMAGE ${PROJECT_DIR} -docker push $IMAGE + +echo "Successfully built: $IMAGE" diff --git a/model-engine/model_engine_server/inference/vllm/requirements-dev.txt b/model-engine/model_engine_server/inference/vllm/requirements-dev.txt index 3cdc3bb5a..588aa75e8 100644 --- a/model-engine/model_engine_server/inference/vllm/requirements-dev.txt +++ b/model-engine/model_engine_server/inference/vllm/requirements-dev.txt @@ -1 +1 @@ -vllm==0.6.6.post1 +vllm==0.10.0 diff --git a/model-engine/model_engine_server/inference/vllm/vllm_adapter.py b/model-engine/model_engine_server/inference/vllm/vllm_adapter.py new file mode 100644 index 000000000..f7d4e8e2b --- /dev/null +++ b/model-engine/model_engine_server/inference/vllm/vllm_adapter.py @@ -0,0 +1,169 @@ +import argparse +import json +from typing import AsyncGenerator, Optional, List + +import uvicorn +from fastapi import BackgroundTasks, FastAPI, Request +from fastapi.responses import Response, StreamingResponse +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.sampling_params import SamplingParams +from vllm.utils import random_uuid +from pydantic import BaseModel, Field +from transformers import AutoTokenizer + + +TIMEOUT_KEEP_ALIVE = 5 # seconds. +TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds +tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-8B") +app = FastAPI() + +class CompletionRequest(BaseModel): + temperature: float = Field(default=0.2, ge=0.0, le=1.0) + stop_sequences: Optional[List[str]] = Field(default=None, max_items=4) + max_new_tokens: Optional[int] = Field(default=None) + prompt: Optional[str] = Field(default=None) + prompts: Optional[List[str]] = Field(default=None) + +class CompletionResponse(BaseModel): + completions: List[tuple] + prompt_tokens: int + response_tokens: int + +@app.get("/readyz") +@app.get("/healthz") +@app.get("/health") +def healthcheck(): + return "OK" + +@app.post("/predict") +async def generate_egp_completions(request: Request) -> Response: + request_id = random_uuid() + request_dict = await request.json() + + chat_history = request_dict.pop("prompt", None) + if chat_history: + try: + messages = json.loads(chat_history) + except: + messages = [chat_history] + + # Handle parameter name differences + if "max_new_tokens" in request_dict: + request_dict["max_tokens"] = request_dict.pop("max_new_tokens") + + text = tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True, + enable_thinking=False, + ) + + sampling_params = SamplingParams(**request_dict) + results_generator = engine.generate(text, sampling_params, request_id) + + final_output = None + tokens = [] + last_output_text = "" + async for request_output in results_generator: + tokens.append(request_output.outputs[-1].text[len(last_output_text) :]) + last_output_text = request_output.outputs[-1].text + if await request.is_disconnected(): + await engine.abort(request_id) + return Response(status_code=499) + final_output = request_output + + assert final_output is not None + + ret = { + "text": final_output.outputs[0].text, + "count_prompt_tokens": len(final_output.prompt_token_ids), + "count_output_tokens": len(final_output.outputs[0].token_ids), + "log_probs": final_output.outputs[0].logprobs, + "tokens": tokens, + "finish_reason": None, + } + # Return Model-Engine compatible format + return Response( + content=json.dumps({ + "text": ret["text"], + "count_prompt_tokens": ret["count_prompt_tokens"], + "count_output_tokens": ret["count_output_tokens"], + "log_probs": ret["log_probs"], + "tokens": ret["tokens"], + "finish_reason": ret["finish_reason"] + }) + ) + +@app.post("/stream") +async def generate(request: Request) -> Response: + request_dict = await request.json() + prompt = request_dict.pop("prompt") + stream = request_dict.pop("stream", False) + + # Handle parameter name differences + if "max_new_tokens" in request_dict: + request_dict["max_tokens"] = request_dict.pop("max_new_tokens") + + sampling_params = SamplingParams(**request_dict) + request_id = random_uuid() + results_generator = engine.generate(prompt, sampling_params, request_id) + + async def stream_results() -> AsyncGenerator[str, None]: + last_output_text = "" + async for request_output in results_generator: + ret = { + "text": request_output.outputs[-1].text[len(last_output_text) :], + "count_prompt_tokens": len(request_output.prompt_token_ids), + "count_output_tokens": len(request_output.outputs[0].token_ids), + "log_probs": request_output.outputs[0].logprobs[-1] + if sampling_params.logprobs + else None, + "finished": request_output.finished, + } + last_output_text = request_output.outputs[-1].text + yield f"data:{json.dumps(ret)}\ +\ +" + + async def abort_request() -> None: + await engine.abort(request_id) + + if stream: + background_tasks = BackgroundTasks() + background_tasks.add_task(abort_request) + return StreamingResponse(stream_results(), background=background_tasks) + + final_output = None + tokens = [] + last_output_text = "" + async for request_output in results_generator: + tokens.append(request_output.outputs[-1].text[len(last_output_text) :]) + last_output_text = request_output.outputs[-1].text + if await request.is_disconnected(): + await engine.abort(request_id) + return Response(status_code=499) + final_output = request_output + + assert final_output is not None + prompt = final_output.prompt + ret = { + "text": final_output.outputs[0].text, + "count_prompt_tokens": len(final_output.prompt_token_ids), + "count_output_tokens": len(final_output.outputs[0].token_ids), + "log_probs": final_output.outputs[0].logprobs, + "tokens": tokens, + } + return Response(content=json.dumps(ret)) + +if __name__ == "__main__": + engine_args = AsyncEngineArgs(model="model_files", enforce_eager=True, tensor_parallel_size=1, gpu_memory_utilization=0.9) + engine = AsyncLLMEngine.from_engine_args(engine_args) + + uvicorn.run( + app, + host="0.0.0.0", + port=5005, + log_level="debug", + timeout_keep_alive=TIMEOUT_KEEP_ALIVE, + ) diff --git a/model-engine/model_engine_server/inference/vllm/vllm_batch.py b/model-engine/model_engine_server/inference/vllm/vllm_batch.py index 394f85d92..b6fc0f242 100644 --- a/model-engine/model_engine_server/inference/vllm/vllm_batch.py +++ b/model-engine/model_engine_server/inference/vllm/vllm_batch.py @@ -99,33 +99,101 @@ async def dummy_receive() -> MutableMapping[str, Any]: async def download_model(checkpoint_path: str, target_dir: str, trust_remote_code: bool) -> None: additional_include = "--include '*.py'" if trust_remote_code else "" - s5cmd = f"./s5cmd --numworkers 512 sync --concurrency 10 --include '*.model' --include '*.json' --include '*.safetensors' {additional_include} --exclude 'optimizer*' --exclude 'train*' {os.path.join(checkpoint_path, '*')} {target_dir}" - env = os.environ.copy() - env["AWS_PROFILE"] = os.getenv("S3_WRITE_AWS_PROFILE", "default") - # Need to override these env vars so s5cmd uses AWS_PROFILE - env["AWS_ROLE_ARN"] = "" - env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" - env["AWS_EC2_METADATA_DISABLED"] = "true" # Disable EC2 metadata for GKE (won't affect EKS) - process = subprocess.Popen( - s5cmd, - shell=True, # nosemgrep + + # Support for third-party object storage (like Scality) + # AWS CLI works with Scality - so we install and use AWS CLI + endpoint_url = os.getenv("AWS_ENDPOINT_URL") + + # Install AWS CLI first (since it's not in the VLLM container by default) + print("Installing AWS CLI...", flush=True) + install_process = subprocess.Popen( + ["pip", "install", "awscli"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - env=env, ) - if process.stdout: - for line in process.stdout: - print(line, flush=True) - - process.wait() - - if process.returncode != 0 and process.stderr: - stderr_lines = [] - for line in iter(process.stderr.readline, ""): - stderr_lines.append(line.strip()) - - print(f"Error downloading model weights: {stderr_lines}", flush=True) + install_process.wait() + + if install_process.returncode != 0: + print("Failed to install AWS CLI", flush=True) + return + + # Simple approach - download all files (basic AWS CLI v1 compatible) + if endpoint_url: + aws_cmd = f"aws s3 sync {checkpoint_path.rstrip('/')} {target_dir} --endpoint-url {endpoint_url} --no-progress" + else: + aws_cmd = f"aws s3 sync {checkpoint_path.rstrip('/')} {target_dir} --no-progress" + + env = os.environ.copy() + + # Configure credentials for object storage or AWS + if endpoint_url: + # Use object storage credentials when custom endpoint is specified + env["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID", "") + env["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY", "") + env["AWS_ENDPOINT_URL"] = endpoint_url + env["AWS_REGION"] = os.getenv("AWS_REGION", "us-east-1") + # Disable AWS-specific features for third-party object storage + env["AWS_EC2_METADATA_DISABLED"] = "true" + env["AWS_ROLE_ARN"] = "" + env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" + else: + # Use AWS profile for S3 + env["AWS_PROFILE"] = os.getenv("S3_WRITE_AWS_PROFILE", "default") + # Need to override these env vars so AWS CLI uses AWS_PROFILE + env["AWS_ROLE_ARN"] = "" + env["AWS_WEB_IDENTITY_TOKEN_FILE"] = "" + env["AWS_EC2_METADATA_DISABLED"] = "true" # Disable EC2 metadata for GKE (won't affect EKS) + + # Retry logic with exponential backoff + max_retries = 3 + retry_delay = 10 # seconds + + for attempt in range(max_retries): + print(f"Running AWS CLI command (attempt {attempt + 1}/{max_retries}): {aws_cmd}", flush=True) + + process = subprocess.Popen( + aws_cmd, + shell=True, # nosemgrep + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=env, + ) + + if process.stdout: + for line in process.stdout: + print(line, flush=True) + + process.wait() + + if process.returncode == 0: + print("Model download completed successfully!", flush=True) + return + else: + # Handle errors + stderr_lines = [] + if process.stderr: + for line in iter(process.stderr.readline, ""): + if line.strip(): + stderr_lines.append(line.strip()) + + print(f"Attempt {attempt + 1} failed with return code {process.returncode}", flush=True) + if stderr_lines: + print(f"Error output: {stderr_lines}", flush=True) + + if attempt < max_retries - 1: + print(f"Retrying in {retry_delay} seconds...", flush=True) + import time + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + print(f"All {max_retries} download attempts failed. Keeping container alive for debugging...", flush=True) + # Keep container running for debugging instead of raising error + import time + while True: + print("Container is alive for debugging. Download failed but not exiting.", flush=True) + time.sleep(300) # Print message every 5 minutes async def generate_v1_completions( diff --git a/model-engine/model_engine_server/inference/vllm/vllm_server.py b/model-engine/model_engine_server/inference/vllm/vllm_server.py index 65cdbb027..032175d43 100644 --- a/model-engine/model_engine_server/inference/vllm/vllm_server.py +++ b/model-engine/model_engine_server/inference/vllm/vllm_server.py @@ -3,12 +3,12 @@ import json import os import signal -import socket import subprocess import traceback from logging import Logger from typing import AsyncGenerator, Dict, List, Optional +import vllm.envs as envs from fastapi import APIRouter, BackgroundTasks, Request from fastapi.responses import Response, StreamingResponse from vllm.engine.async_llm_engine import ( @@ -17,13 +17,20 @@ ) from vllm.engine.protocol import EngineClient from vllm.entrypoints.launcher import serve_http -from vllm.entrypoints.openai.api_server import build_app, build_async_engine_client, init_app_state +from vllm.entrypoints.openai.api_server import ( + build_app, + build_async_engine_client, + init_app_state, + load_log_config, + maybe_register_tokenizer_info_endpoint, + setup_server, +) from vllm.entrypoints.openai.cli_args import make_arg_parser +from vllm.entrypoints.openai.tool_parsers import ToolParserManager from vllm.outputs import CompletionOutput from vllm.sampling_params import SamplingParams from vllm.sequence import Logprob from vllm.utils import FlexibleArgumentParser, random_uuid -from vllm.version import __version__ as VLLM_VERSION logger = Logger("vllm_server") @@ -197,34 +204,48 @@ def parse_args(parser: FlexibleArgumentParser): async def run_server(args, **uvicorn_kwargs) -> None: - logger.info("vLLM API server version %s", VLLM_VERSION) - logger.info("args: %s", args) + """Run a single-worker API server.""" + listen_address, sock = setup_server(args) + await run_server_worker(listen_address, sock, args, **uvicorn_kwargs) + + +async def run_server_worker( + listen_address, sock, args, client_config=None, **uvicorn_kwargs +) -> None: + """Run a single API server worker.""" - temp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # nosemgrep - temp_socket.bind(("", args.port)) + if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3: + ToolParserManager.import_tool_parser(args.tool_parser_plugin) - def signal_handler(*_) -> None: - # Interrupt server on sigterm while initializing - raise KeyboardInterrupt("terminated") + server_index = client_config.get("client_index", 0) if client_config else 0 - signal.signal(signal.SIGTERM, signal_handler) + # Load logging config for uvicorn if specified + log_config = load_log_config(args.log_config_file) + if log_config is not None: + uvicorn_kwargs["log_config"] = log_config global engine_client - async with build_async_engine_client(args) as engine_client: - app = build_app(args) - model_config = await engine_client.get_model_config() - init_app_state(engine_client, model_config, app.state, args) + async with build_async_engine_client(args, client_config) as engine_client: + maybe_register_tokenizer_info_endpoint(args) + app = build_app(args) - temp_socket.close() + vllm_config = await engine_client.get_vllm_config() + await init_app_state(engine_client, vllm_config, app.state, args) app.include_router(router) + logger.info("Starting vLLM API server %d on %s", server_index, listen_address) shutdown_task = await serve_http( app, + sock=sock, + enable_ssl_refresh=args.enable_ssl_refresh, host=args.host, port=args.port, log_level=args.uvicorn_log_level, - timeout_keep_alive=TIMEOUT_KEEP_ALIVE, + # NOTE: When the 'disable_uvicorn_access_log' value is True, + # no access log will be output. + access_log=not args.disable_uvicorn_access_log, + timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE, ssl_keyfile=args.ssl_keyfile, ssl_certfile=args.ssl_certfile, ssl_ca_certs=args.ssl_ca_certs, @@ -233,7 +254,10 @@ def signal_handler(*_) -> None: ) # NB: Await server shutdown only after the backend context is exited - await shutdown_task + try: + await shutdown_task + finally: + sock.close() if __name__ == "__main__": @@ -243,4 +267,4 @@ def signal_handler(*_) -> None: args = parse_args(parser) if args.attention_backend is not None: os.environ["VLLM_ATTENTION_BACKEND"] = args.attention_backend - asyncio.run(run_server(args)) + asyncio.run(run_server(args)) \ No newline at end of file diff --git a/model-engine/model_engine_server/infra/gateways/celery_task_queue_gateway.py b/model-engine/model_engine_server/infra/gateways/celery_task_queue_gateway.py index e912bf00b..669020862 100644 --- a/model-engine/model_engine_server/infra/gateways/celery_task_queue_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/celery_task_queue_gateway.py @@ -1,4 +1,5 @@ import json +import os from typing import Any, Dict, List, Optional import botocore @@ -18,28 +19,67 @@ logger = make_logger(logger_name()) backend_protocol = "abs" if infra_config().cloud_provider == "azure" else "s3" -celery_redis = celery_app( - None, - s3_bucket=infra_config().s3_bucket, - broker_type=str(BrokerType.REDIS.value), - backend_protocol=backend_protocol, -) -celery_redis_24h = celery_app( - None, - s3_bucket=infra_config().s3_bucket, - broker_type=str(BrokerType.REDIS.value), - task_visibility=TaskVisibility.VISIBILITY_24H, - backend_protocol=backend_protocol, -) -celery_sqs = celery_app( - None, - s3_bucket=infra_config().s3_bucket, - broker_type=str(BrokerType.SQS.value), - backend_protocol=backend_protocol, -) -celery_servicebus = celery_app( - None, broker_type=str(BrokerType.SERVICEBUS.value), backend_protocol=backend_protocol -) +# Initialize celery apps lazily to avoid import-time AWS session creation +celery_redis = None +celery_redis_24h = None +celery_sqs = None +celery_servicebus = None + +def _get_celery_redis(): + global celery_redis + if celery_redis is None: + celery_redis = celery_app( + None, + s3_bucket=infra_config().s3_bucket, + broker_type=str(BrokerType.REDIS.value), + backend_protocol=backend_protocol, + ) + return celery_redis + +def _get_celery_redis_24h(): + global celery_redis_24h + if celery_redis_24h is None: + celery_redis_24h = celery_app( + None, + s3_bucket=infra_config().s3_bucket, + broker_type=str(BrokerType.REDIS.value), + task_visibility=TaskVisibility.VISIBILITY_24H, + backend_protocol=backend_protocol, + ) + return celery_redis_24h + +def _get_celery_sqs(): + global celery_sqs + if celery_sqs is None: + # For on-premises environments, use Redis instead of SQS + if infra_config().cloud_provider == "onprem": + logger.info("Using Redis broker for on-premises environment instead of SQS") + return _get_celery_redis() + # Check if SQS broker is disabled via cloud provider + if infra_config().cloud_provider != "aws": + raise ValueError(f"SQS broker requires AWS cloud provider, but current provider is {infra_config().cloud_provider}") + celery_sqs = celery_app( + None, + s3_bucket=infra_config().s3_bucket, + broker_type=str(BrokerType.SQS.value), + backend_protocol=backend_protocol, + ) + return celery_sqs + +def _get_celery_servicebus(): + global celery_servicebus + if celery_servicebus is None: + # For on-premises environments, use Redis instead of ServiceBus + if infra_config().cloud_provider == "onprem": + logger.info("Using Redis broker for on-premises environment instead of ServiceBus") + return _get_celery_redis() + # Check if ServiceBus broker is disabled via cloud provider + if infra_config().cloud_provider != "azure": + raise ValueError(f"ServiceBus broker requires Azure cloud provider, but current provider is {infra_config().cloud_provider}") + celery_servicebus = celery_app( + None, broker_type=str(BrokerType.SERVICEBUS.value), backend_protocol=backend_protocol + ) + return celery_servicebus class CeleryTaskQueueGateway(TaskQueueGateway): @@ -55,13 +95,13 @@ def __init__(self, broker_type: BrokerType, tracing_gateway: TracingGateway): def _get_celery_dest(self): if self.broker_type == BrokerType.SQS: - return celery_sqs + return _get_celery_sqs() elif self.broker_type == BrokerType.REDIS_24H: - return celery_redis_24h + return _get_celery_redis_24h() elif self.broker_type == BrokerType.REDIS: - return celery_redis + return _get_celery_redis() else: - return celery_servicebus + return _get_celery_servicebus() def send_task( self, diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 96d5fbb4d..dff6490a3 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -536,9 +536,11 @@ def get_endpoint_resource_arguments_from_request( service_name_override = service_name_override or k8s_resource_group_name - storage_dict = DictStrStr("") + # Original simple storage configuration if storage is not None: storage_dict = DictStrStr(f'ephemeral-storage: "{storage}"') + else: + storage_dict = DictStrStr("") change_cause_message = ( f"Deployment at {datetime.utcnow()} UTC. " @@ -560,6 +562,9 @@ def get_endpoint_resource_arguments_from_request( elif infra_config().cloud_provider == "azure": broker_name = BrokerName.SERVICEBUS.value broker_type = BrokerType.SERVICEBUS.value + elif infra_config().cloud_provider == "onprem": + broker_name = BrokerName.REDIS.value + broker_type = BrokerType.REDIS.value else: broker_name = BrokerName.SQS.value broker_type = BrokerType.SQS.value @@ -570,9 +575,23 @@ def get_endpoint_resource_arguments_from_request( main_env = [] if isinstance(flavor, RunnableImageLike) and flavor.env: main_env = [{"name": key, "value": value} for key, value in flavor.env.items()] - main_env.append({"name": "AWS_PROFILE", "value": build_endpoint_request.aws_role}) - # NOTE: /opt/.aws/config is where service_template_config_map.yaml mounts the AWS config file, point to the mount for boto clients - main_env.append({"name": "AWS_CONFIG_FILE", "value": "/opt/.aws/config"}) + + # Add environment variables based on cloud provider + if infra_config().cloud_provider == "onprem": + # On-prem S3 credentials from environment variables (injected by Helm secrets) + main_env.extend([ + {"name": "AWS_ACCESS_KEY_ID", "valueFrom": {"secretKeyRef": {"name": "model-engine-object-storage-config", "key": "access-key"}}}, + {"name": "AWS_SECRET_ACCESS_KEY", "valueFrom": {"secretKeyRef": {"name": "model-engine-object-storage-config", "key": "secret-key"}}}, + {"name": "AWS_ENDPOINT_URL", "value": infra_config().aws_endpoint_url}, + {"name": "AWS_REGION", "value": infra_config().default_region or "us-east-1"}, + {"name": "AWS_S3_FORCE_PATH_STYLE", "value": "true"}, + ]) + else: + # AWS cloud deployments use AWS_PROFILE and AWS_CONFIG_FILE + main_env.append({"name": "AWS_PROFILE", "value": build_endpoint_request.aws_role}) + # NOTE: /opt/.aws/config is where service_template_config_map.yaml mounts the AWS config file, point to the mount for boto clients + main_env.append({"name": "AWS_CONFIG_FILE", "value": "/opt/.aws/config"}) + abs_account_name = os.getenv("ABS_ACCOUNT_NAME") if abs_account_name is not None: main_env.append({"name": "ABS_ACCOUNT_NAME", "value": abs_account_name}) @@ -581,8 +600,21 @@ def get_endpoint_resource_arguments_from_request( worker_env = None if isinstance(flavor, RunnableImageLike) and flavor.worker_env is not None: worker_env = [{"name": key, "value": value} for key, value in flavor.worker_env.items()] - worker_env.append({"name": "AWS_PROFILE", "value": build_endpoint_request.aws_role}) - worker_env.append({"name": "AWS_CONFIG_FILE", "value": "/opt/.aws/config"}) + + # Add worker environment variables based on cloud provider + if infra_config().cloud_provider == "onprem": + # On-prem S3 credentials for worker containers + worker_env.extend([ + {"name": "AWS_ACCESS_KEY_ID", "valueFrom": {"secretKeyRef": {"name": "model-engine-object-storage-config", "key": "access-key"}}}, + {"name": "AWS_SECRET_ACCESS_KEY", "valueFrom": {"secretKeyRef": {"name": "model-engine-object-storage-config", "key": "secret-key"}}}, + {"name": "AWS_ENDPOINT_URL", "value": infra_config().aws_endpoint_url}, + {"name": "AWS_REGION", "value": infra_config().default_region or "us-east-1"}, + {"name": "AWS_S3_FORCE_PATH_STYLE", "value": "true"}, + ]) + else: + # AWS cloud deployments use AWS_PROFILE and AWS_CONFIG_FILE + worker_env.append({"name": "AWS_PROFILE", "value": build_endpoint_request.aws_role}) + worker_env.append({"name": "AWS_CONFIG_FILE", "value": "/opt/.aws/config"}) worker_command = None if isinstance(flavor, RunnableImageLike) and flavor.worker_command is not None: @@ -646,6 +678,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -696,6 +729,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -748,6 +782,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -795,6 +830,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -844,6 +880,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -890,6 +927,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -938,6 +976,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -996,6 +1035,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -1056,6 +1096,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -1110,6 +1151,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, @@ -1169,6 +1211,7 @@ def get_endpoint_resource_arguments_from_request( CPUS=str(build_endpoint_request.cpus), MEMORY=str(build_endpoint_request.memory), STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, diff --git a/model-engine/model_engine_server/infra/gateways/s3_filesystem_gateway.py b/model-engine/model_engine_server/infra/gateways/s3_filesystem_gateway.py index b0bf9e84e..c03d2719c 100644 --- a/model-engine/model_engine_server/infra/gateways/s3_filesystem_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/s3_filesystem_gateway.py @@ -4,6 +4,7 @@ import boto3 import smart_open +from model_engine_server.core.config import infra_config from model_engine_server.infra.gateways.filesystem_gateway import FilesystemGateway @@ -13,9 +14,27 @@ class S3FilesystemGateway(FilesystemGateway): """ def get_s3_client(self, kwargs): - profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE")) - session = boto3.Session(profile_name=profile_name) - client = session.client("s3") + if infra_config().cloud_provider == "onprem": + # For onprem, use explicit credentials from environment variables + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + else: + profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE")) + session = boto3.Session(profile_name=profile_name) + + # Support custom endpoints for S3-compatible storage (like Scality) + # Uses standard boto3 environment variables + endpoint_url = kwargs.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL") + + client_kwargs = {} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + # For custom endpoints, boto3 automatically uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + + client = session.client("s3", **client_kwargs) return client def open(self, uri: str, mode: str = "rt", **kwargs) -> IO: diff --git a/model-engine/model_engine_server/infra/gateways/s3_llm_artifact_gateway.py b/model-engine/model_engine_server/infra/gateways/s3_llm_artifact_gateway.py index b48d1eef2..e0b9a76b8 100644 --- a/model-engine/model_engine_server/infra/gateways/s3_llm_artifact_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/s3_llm_artifact_gateway.py @@ -4,6 +4,7 @@ import boto3 from model_engine_server.common.config import get_model_cache_directory_name, hmi_config +from model_engine_server.core.config import infra_config from model_engine_server.core.loggers import logger_name, make_logger from model_engine_server.core.utils.url import parse_attachment_url from model_engine_server.domain.gateways import LLMArtifactGateway @@ -13,13 +14,31 @@ class S3LLMArtifactGateway(LLMArtifactGateway): """ - Concrete implemention for interacting with a filesystem backed by S3. + Concrete implementation for interacting with a filesystem backed by S3. """ def _get_s3_resource(self, kwargs): - profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE")) - session = boto3.Session(profile_name=profile_name) - resource = session.resource("s3") + if infra_config().cloud_provider == "onprem": + # For onprem, use explicit credentials from environment variables + session = boto3.Session( + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=infra_config().default_region + ) + else: + profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE")) + session = boto3.Session(profile_name=profile_name) + + # Support custom endpoints for S3-compatible storage (like Scality) + # Uses standard boto3 environment variables + endpoint_url = kwargs.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL") + + resource_kwargs = {} + if endpoint_url: + resource_kwargs["endpoint_url"] = endpoint_url + # For custom endpoints, boto3 automatically uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + + resource = session.resource("s3", **resource_kwargs) return resource def list_files(self, path: str, **kwargs) -> List[str]: diff --git a/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_events_repository.py b/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_events_repository.py index 2dfcbc769..7092d9f02 100644 --- a/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_events_repository.py +++ b/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_events_repository.py @@ -27,7 +27,17 @@ def __init__(self): def _get_s3_client(self, kwargs): profile_name = kwargs.get("aws_profile", os.getenv("S3_WRITE_AWS_PROFILE")) session = boto3.Session(profile_name=profile_name) - client = session.client("s3") + + # Support custom endpoints for S3-compatible storage (like Scality) + # Uses standard boto3 environment variables + endpoint_url = kwargs.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL") + + client_kwargs = {} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + # For custom endpoints, boto3 automatically uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + + client = session.client("s3", **client_kwargs) return client def _open(self, uri: str, mode: str = "rt", **kwargs) -> IO: diff --git a/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_repository.py b/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_repository.py index 6b3ea8aa8..385bcd3e7 100644 --- a/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_repository.py +++ b/model-engine/model_engine_server/infra/repositories/s3_file_llm_fine_tune_repository.py @@ -15,7 +15,17 @@ def __init__(self, file_path: str): def _get_s3_client(self, kwargs): profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE")) session = boto3.Session(profile_name=profile_name) - client = session.client("s3") + + # Support custom endpoints for S3-compatible storage (like Scality) + # Uses standard boto3 environment variables + endpoint_url = kwargs.get("endpoint_url") or os.getenv("AWS_ENDPOINT_URL") + + client_kwargs = {} + if endpoint_url: + client_kwargs["endpoint_url"] = endpoint_url + # For custom endpoints, boto3 automatically uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + + client = session.client("s3", **client_kwargs) return client def _open(self, uri: str, mode: str = "rt", **kwargs) -> IO: diff --git a/model-engine/model_engine_server/infra/services/image_cache_service.py b/model-engine/model_engine_server/infra/services/image_cache_service.py index f2b1dc289..303fea97b 100644 --- a/model-engine/model_engine_server/infra/services/image_cache_service.py +++ b/model-engine/model_engine_server/infra/services/image_cache_service.py @@ -16,6 +16,7 @@ from model_engine_server.infra.repositories.model_endpoint_record_repository import ( ModelEndpointRecordRepository, ) +import os logger = make_logger(logger_name()) @@ -60,6 +61,11 @@ def _cache_finetune_llm_images( """ Cache images used by fine tune LLM endpoints to reduce cold start time. """ + # Skip fine-tune image caching for on-premises environments + if infra_config().cloud_provider == "onprem": + logger.info("Skipping fine-tune image caching for on-premises environment") + return + # a cache priority to ensure llm endpoint images are always prioritized llm_image_cache_priority = CachePriority( is_high_priority=1, # make it a high priority @@ -80,7 +86,7 @@ def _cache_finetune_llm_images( f"{infra_config().docker_repo_prefix}/{hmi_config.vllm_repository}", "0.3.2" ) latest_tag = "fake_docker_repository_latest_image_tag" - if not CIRCLECI: + if not CIRCLECI and infra_config().cloud_provider != "onprem": try: # pragma: no cover latest_tag = self.docker_repository.get_latest_image_tag( hmi_config.batch_inference_vllm_repository diff --git a/model-engine/model_engine_server/infra/services/live_endpoint_builder_service.py b/model-engine/model_engine_server/infra/services/live_endpoint_builder_service.py index 1b32104a5..df49e6866 100644 --- a/model-engine/model_engine_server/infra/services/live_endpoint_builder_service.py +++ b/model-engine/model_engine_server/infra/services/live_endpoint_builder_service.py @@ -405,6 +405,28 @@ def convert_artifact_like_bundle_to_runnable_image( else: infra_config_file = infra_config().env + ".yaml" + # Build base environment variables + base_env = { + "OMP_NUM_THREADS": "1", + "BASE_PATH": "/app", + "BUNDLE_URL": model_bundle.flavor.location, + "RESULTS_S3_BUCKET": infra_config().s3_bucket, + "CHILD_FN_INFO": json.dumps( + build_endpoint_request.child_fn_info + if build_endpoint_request.child_fn_info + else {} + ), + "PREWARM": bool_to_str(build_endpoint_request.prewarm) or "false", + "PORT": "5005", + "ML_INFRA_SERVICES_CONFIG_PATH": f"/app/model-engine/model_engine_server/core/configs/{infra_config_file}", + } + + # Add cloud-provider specific environment variables + if infra_config().cloud_provider != "onprem": + # AWS cloud deployments use AWS_PROFILE + base_env["AWS_PROFILE"] = build_endpoint_request.aws_role + # Note: For onprem, S3 credentials will be injected by k8s_resource_types.py + new_flavor = RunnableImageFlavor( flavor=ModelBundleFlavorType.RUNNABLE_IMAGE, repository=image_repo, @@ -418,21 +440,7 @@ def convert_artifact_like_bundle_to_runnable_image( "-m", "model_engine_server.inference.sync_inference.start_fastapi_server", ], - env={ - "OMP_NUM_THREADS": "1", - "BASE_PATH": "/app", - "BUNDLE_URL": model_bundle.flavor.location, - "AWS_PROFILE": build_endpoint_request.aws_role, - "RESULTS_S3_BUCKET": infra_config().s3_bucket, - "CHILD_FN_INFO": json.dumps( - build_endpoint_request.child_fn_info - if build_endpoint_request.child_fn_info - else {} - ), - "PREWARM": bool_to_str(build_endpoint_request.prewarm) or "false", - "PORT": "5005", - "ML_INFRA_SERVICES_CONFIG_PATH": f"/app/model-engine/model_engine_server/core/configs/{infra_config_file}", - }, + env=base_env, protocol="http", ) diff --git a/model-engine/model_engine_server/service_builder/celery.py b/model-engine/model_engine_server/service_builder/celery.py index 06384c9e4..24be89ff3 100644 --- a/model-engine/model_engine_server/service_builder/celery.py +++ b/model-engine/model_engine_server/service_builder/celery.py @@ -8,6 +8,8 @@ service_builder_broker_type = str(BrokerType.REDIS.value) elif infra_config().cloud_provider == "azure": service_builder_broker_type = str(BrokerType.SERVICEBUS.value) +elif infra_config().cloud_provider == "onprem": + service_builder_broker_type = str(BrokerType.REDIS.value) else: service_builder_broker_type = str(BrokerType.SQS.value) @@ -18,7 +20,7 @@ ], s3_bucket=infra_config().s3_bucket, broker_type=service_builder_broker_type, - backend_protocol="abs" if infra_config().cloud_provider == "azure" else "s3", + backend_protocol="abs" if infra_config().cloud_provider == "azure" else "redis" if infra_config().cloud_provider == "onprem" else "s3", ) if __name__ == "__main__": diff --git a/model-engine/tests/unit/domain/test_llm_use_cases.py b/model-engine/tests/unit/domain/test_llm_use_cases.py index 5ea0a494c..b78bcb63a 100644 --- a/model-engine/tests/unit/domain/test_llm_use_cases.py +++ b/model-engine/tests/unit/domain/test_llm_use_cases.py @@ -580,21 +580,23 @@ def test_load_model_weights_sub_commands( final_weights_folder = "test_folder" subcommands = llm_bundle_use_case.load_model_weights_sub_commands( - framework, framework_image_tag, checkpoint_path, final_weights_folder + framework, framework_image_tag, checkpoint_path, final_weights_folder, model_name="test-model" ) expected_result = [ - './s5cmd --numworkers 512 cp --concurrency 10 --include "*.model" --include "*.model.v*" --include "*.json" --include "*.safetensors" --exclude "optimizer*" s3://fake-checkpoint/* test_folder', + "pip install awscli || echo 'Failed to install awscli but continuing...'", + "aws s3 sync s3://fake-checkpoint test_folder --no-progress --max-concurrent-requests 10 --multipart-threshold 100MB --multipart-chunksize 50MB || echo 'Download failed but continuing for debugging...'", ] assert expected_result == subcommands trust_remote_code = True subcommands = llm_bundle_use_case.load_model_weights_sub_commands( - framework, framework_image_tag, checkpoint_path, final_weights_folder, trust_remote_code + framework, framework_image_tag, checkpoint_path, final_weights_folder, trust_remote_code, "test-model" ) expected_result = [ - './s5cmd --numworkers 512 cp --concurrency 10 --include "*.model" --include "*.model.v*" --include "*.json" --include "*.safetensors" --exclude "optimizer*" --include "*.py" s3://fake-checkpoint/* test_folder', + "pip install awscli || echo 'Failed to install awscli but continuing...'", + "aws s3 sync s3://fake-checkpoint test_folder --no-progress --max-concurrent-requests 10 --multipart-threshold 100MB --multipart-chunksize 50MB || echo 'Download failed but continuing for debugging...'", ] assert expected_result == subcommands @@ -608,8 +610,8 @@ def test_load_model_weights_sub_commands( ) expected_result = [ - "s5cmd > /dev/null || conda install -c conda-forge -y s5cmd", - 's5cmd --numworkers 512 cp --concurrency 10 --include "*.model" --include "*.model.v*" --include "*.json" --include "*.safetensors" --exclude "optimizer*" s3://fake-checkpoint/* test_folder', + 'pip install awscli', + 'aws s3 sync s3://fake-checkpoint test_folder', ] assert expected_result == subcommands @@ -631,7 +633,7 @@ def test_load_model_weights_sub_commands( trust_remote_code = True subcommands = llm_bundle_use_case.load_model_weights_sub_commands( - framework, framework_image_tag, checkpoint_path, final_weights_folder, trust_remote_code + framework, framework_image_tag, checkpoint_path, final_weights_folder, trust_remote_code, "test-model" ) expected_result = [ diff --git a/model-engine/tests/unit/infra/gateways/resources/example_lws_config.json b/model-engine/tests/unit/infra/gateways/resources/example_lws_config.json index 417934788..4af641700 100644 --- a/model-engine/tests/unit/infra/gateways/resources/example_lws_config.json +++ b/model-engine/tests/unit/infra/gateways/resources/example_lws_config.json @@ -367,7 +367,7 @@ "command": [ "/bin/bash", "-c", - "./s5cmd --numworkers 512 cp --concurrency 10 --include '*.model' --include '*.json' --include '*.safetensors' --exclude 'optimizer*' s3://bucket/tag/userid000000/model_weights/* model_files;/workspace/init_ray.sh leader --ray_cluster_size=$RAY_CLUSTER_SIZE --own_address=$K8S_OWN_POD_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local;python -m vllm_server --model model_files --tensor-parallel-size 1 --port 5005 --disable-log-requests--enforce-eager" + "pip install awscli && aws s3 sync s3://bucket/tag/userid000000/model_weights model_files --include '*.model' --include '*.json' --include '*.safetensors' --exclude 'optimizer*';/workspace/init_ray.sh leader --ray_cluster_size=$RAY_CLUSTER_SIZE --own_address=$K8S_OWN_POD_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local;python -m vllm_server --model model_files --tensor-parallel-size 1 --port 5005 --disable-log-requests--enforce-eager" ], "env": [ { @@ -626,7 +626,7 @@ "command": [ "/bin/bash", "-c", - "./s5cmd --numworkers 512 cp --concurrency 10 --include '*.model' --include '*.json' --include '*.safetensors' --exclude 'optimizer*' s3://bucket/key/userid000000/model_weights/* model_files;/workspace/init_ray.sh worker --ray_cluster_size=$RAY_CLUSTER_SIZE --ray_address=$K8S_LEADER_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local --own_address=$K8S_OWN_POD_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local" + "pip install awscli && aws s3 sync s3://bucket/key/userid000000/model_weights model_files --include '*.model' --include '*.json' --include '*.safetensors' --exclude 'optimizer*';/workspace/init_ray.sh worker --ray_cluster_size=$RAY_CLUSTER_SIZE --ray_address=$K8S_LEADER_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local --own_address=$K8S_OWN_POD_NAME.$K8S_LWS_NAME.$K8S_OWN_NAMESPACE.svc.cluster.local" ], "env": [ {