From 206ec647a0f733c080d208d641f0b248a81b5b95 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 10:13:13 -0700 Subject: [PATCH 1/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- ...nvidia.com_dynamocomponentdeployments.yaml | 14 + .../nvidia.com_dynamographdeployments.yaml | 14 + .../templates/nvidia.com_dynamomodels.yaml | 198 ++++++++++ .../operator/templates/manager-rbac.yaml | 11 + .../api/v1alpha1/dynamo_model_types.go | 129 +++++++ .../dynamocomponentdeployment_types.go | 16 + .../api/v1alpha1/zz_generated.deepcopy.go | 156 +++++++- deploy/cloud/operator/cmd/main.go | 10 + ...nvidia.com_dynamocomponentdeployments.yaml | 14 + .../nvidia.com_dynamographdeployments.yaml | 14 + .../crd/bases/nvidia.com_dynamomodels.yaml | 198 ++++++++++ deploy/cloud/operator/config/rbac/role.yaml | 11 + .../cloud/operator/internal/consts/consts.go | 1 + .../controller/dynamo_model_controller.go | 362 ++++++++++++++++++ .../dynamocomponentdeployment_controller.go | 26 +- .../dynamographdeployment_controller.go | 12 + .../cloud/operator/internal/dynamo/graph.go | 2 + .../internal/dynamo/headless_service.go | 147 +++++++ .../internal/modelendpoint/discovery.go | 112 ++++++ .../operator/internal/modelendpoint/lora.go | 175 +++++++++ .../operator/internal/modelendpoint/prober.go | 172 +++++++++ .../operator/internal/modelendpoint/types.go | 35 ++ .../operator/internal/workerpool/pool.go | 102 +++++ 23 files changed, 1919 insertions(+), 12 deletions(-) create mode 100644 deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml create mode 100644 deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go create mode 100644 deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml create mode 100644 deploy/cloud/operator/internal/controller/dynamo_model_controller.go create mode 100644 deploy/cloud/operator/internal/dynamo/headless_service.go create mode 100644 deploy/cloud/operator/internal/modelendpoint/discovery.go create mode 100644 deploy/cloud/operator/internal/modelendpoint/lora.go create mode 100644 deploy/cloud/operator/internal/modelendpoint/prober.go create mode 100644 deploy/cloud/operator/internal/modelendpoint/types.go create mode 100644 deploy/cloud/operator/internal/workerpool/pool.go diff --git a/deploy/cloud/helm/crds/templates/nvidia.com_dynamocomponentdeployments.yaml b/deploy/cloud/helm/crds/templates/nvidia.com_dynamocomponentdeployments.yaml index f4164dc841..6f9053c54a 100644 --- a/deploy/cloud/helm/crds/templates/nvidia.com_dynamocomponentdeployments.yaml +++ b/deploy/cloud/helm/crds/templates/nvidia.com_dynamocomponentdeployments.yaml @@ -10002,6 +10002,20 @@ spec: format: int32 type: integer type: object + modelRef: + description: |- + ModelRef references a model that this component serves + When specified, a headless service will be created for endpoint discovery + properties: + name: + description: Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") + type: string + revision: + description: Revision is the model revision/version (optional) + type: string + required: + - name + type: object multinode: description: Multinode is the configuration for multinode components. properties: diff --git a/deploy/cloud/helm/crds/templates/nvidia.com_dynamographdeployments.yaml b/deploy/cloud/helm/crds/templates/nvidia.com_dynamographdeployments.yaml index 037dfecebb..a5647db120 100644 --- a/deploy/cloud/helm/crds/templates/nvidia.com_dynamographdeployments.yaml +++ b/deploy/cloud/helm/crds/templates/nvidia.com_dynamographdeployments.yaml @@ -10136,6 +10136,20 @@ spec: format: int32 type: integer type: object + modelRef: + description: |- + ModelRef references a model that this component serves + When specified, a headless service will be created for endpoint discovery + properties: + name: + description: Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") + type: string + revision: + description: Revision is the model revision/version (optional) + type: string + required: + - name + type: object multinode: description: Multinode is the configuration for multinode components. properties: diff --git a/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml b/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml new file mode 100644 index 0000000000..473c20a004 --- /dev/null +++ b/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml @@ -0,0 +1,198 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.4 + helm.sh/resource-policy: keep + name: dynamomodels.nvidia.com +spec: + group: nvidia.com + names: + kind: DynamoModel + listKind: DynamoModelList + plural: dynamomodels + shortNames: + - dm + singular: dynamomodel + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Base model name + jsonPath: .spec.baseModelName + name: BaseModel + type: string + - description: Model type + jsonPath: .spec.modelType + name: Type + type: string + - description: Ready endpoints + jsonPath: .status.readyEndpoints + name: Ready + type: integer + - description: Total endpoints + jsonPath: .status.totalEndpoints + name: Total + type: integer + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: DynamoModel is the Schema for the dynamo models API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: DynamoModelSpec defines the desired state of DynamoModel + properties: + baseModelName: + description: |- + BaseModelName is the base model identifier that matches the service label + This is used to discover endpoints via headless services + type: string + loraPath: + description: LoraPath is the path to the LoRA adapter (only applicable for lora model type) + type: string + modelName: + description: ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") + type: string + modelType: + default: base + description: ModelType specifies the type of model (e.g., "base", "lora", "adapter") + enum: + - base + - lora + - adapter + type: string + required: + - baseModelName + - modelName + type: object + status: + description: DynamoModelStatus defines the observed state of DynamoModel + properties: + conditions: + description: Conditions represents the latest available observations of the model's state + items: + description: Condition contains details for one aspect of the current state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + endpoints: + description: Endpoints is the current list of all endpoints for this model + items: + description: EndpointInfo represents a single endpoint (pod) serving the model + properties: + address: + description: Address is the full address of the endpoint (e.g., "http://10.0.1.5:9090") + type: string + podName: + description: PodName is the name of the pod serving this endpoint + type: string + ready: + description: |- + Ready indicates whether the endpoint is ready to serve traffic + For LoRA models: true if the LoRA was successfully loaded and appears in GET /v1/loras + For base models: always false (no probing performed) + type: boolean + required: + - address + - ready + type: object + type: array + readyEndpoints: + description: ReadyEndpoints is the count of endpoints that are ready + type: integer + totalEndpoints: + description: TotalEndpoints is the total count of endpoints + type: integer + required: + - readyEndpoints + - totalEndpoints + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml b/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml index dcc598ccd7..8ab42c0988 100644 --- a/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml +++ b/deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml @@ -62,6 +62,14 @@ rules: - patch - update - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -361,6 +369,7 @@ rules: - dynamocomponentdeployments - dynamographdeploymentrequests - dynamographdeployments + - dynamomodels verbs: - create - delete @@ -375,6 +384,7 @@ rules: - dynamocomponentdeployments/finalizers - dynamographdeploymentrequests/finalizers - dynamographdeployments/finalizers + - dynamomodels/finalizers verbs: - update - apiGroups: @@ -383,6 +393,7 @@ rules: - dynamocomponentdeployments/status - dynamographdeploymentrequests/status - dynamographdeployments/status + - dynamomodels/status verbs: - get - patch diff --git a/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go new file mode 100644 index 0000000000..19ec687b70 --- /dev/null +++ b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go @@ -0,0 +1,129 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// DynamoModelSpec defines the desired state of DynamoModel +type DynamoModelSpec struct { + // ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") + // +kubebuilder:validation:Required + ModelName string `json:"modelName"` + + // BaseModelName is the base model identifier that matches the service label + // This is used to discover endpoints via headless services + // +kubebuilder:validation:Required + BaseModelName string `json:"baseModelName"` + + // ModelType specifies the type of model (e.g., "base", "lora", "adapter") + // +kubebuilder:validation:Enum=base;lora;adapter + // +kubebuilder:default=base + // +optional + ModelType string `json:"modelType,omitempty"` + + // LoraPath is the path to the LoRA adapter (only applicable for lora model type) + // +optional + LoraPath string `json:"loraPath,omitempty"` +} + +// EndpointInfo represents a single endpoint (pod) serving the model +type EndpointInfo struct { + // Address is the full address of the endpoint (e.g., "http://10.0.1.5:9090") + Address string `json:"address"` + + // PodName is the name of the pod serving this endpoint + // +optional + PodName string `json:"podName,omitempty"` + + // Ready indicates whether the endpoint is ready to serve traffic + // For LoRA models: true if the LoRA was successfully loaded and appears in GET /v1/loras + // For base models: always false (no probing performed) + Ready bool `json:"ready"` +} + +// DynamoModelStatus defines the observed state of DynamoModel +type DynamoModelStatus struct { + // Endpoints is the current list of all endpoints for this model + // +optional + Endpoints []EndpointInfo `json:"endpoints,omitempty"` + + // ReadyEndpoints is the count of endpoints that are ready + ReadyEndpoints int `json:"readyEndpoints"` + + // TotalEndpoints is the total count of endpoints + TotalEndpoints int `json:"totalEndpoints"` + + // Conditions represents the latest available observations of the model's state + // +optional + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// +genclient +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:storageversion +// +kubebuilder:printcolumn:name="BaseModel",type="string",JSONPath=".spec.baseModelName",description="Base model name" +// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.modelType",description="Model type" +// +kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyEndpoints",description="Ready endpoints" +// +kubebuilder:printcolumn:name="Total",type="integer",JSONPath=".status.totalEndpoints",description="Total endpoints" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:resource:shortName=dm +// DynamoModel is the Schema for the dynamo models API +type DynamoModel struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec DynamoModelSpec `json:"spec,omitempty"` + Status DynamoModelStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// DynamoModelList contains a list of DynamoModel +type DynamoModelList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []DynamoModel `json:"items"` +} + +func init() { + SchemeBuilder.Register(&DynamoModel{}, &DynamoModelList{}) +} + +// GetReadyEndpoints returns only the endpoints that are ready +func (m *DynamoModel) GetReadyEndpoints() []EndpointInfo { + var ready []EndpointInfo + for _, ep := range m.Status.Endpoints { + if ep.Ready { + ready = append(ready, ep) + } + } + return ready +} + +// HasEndpoints returns true if the model has any endpoints +func (m *DynamoModel) HasEndpoints() bool { + return len(m.Status.Endpoints) > 0 +} + +// HasReadyEndpoints returns true if the model has any ready endpoints +func (m *DynamoModel) HasReadyEndpoints() bool { + return m.Status.ReadyEndpoints > 0 +} diff --git a/deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types.go b/deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types.go index a119bb5045..abad52a4ee 100644 --- a/deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types.go +++ b/deploy/cloud/operator/api/v1alpha1/dynamocomponentdeployment_types.go @@ -83,6 +83,11 @@ type DynamoComponentDeploymentSharedSpec struct { // Ingress config to expose the component outside the cluster (or through a service mesh). Ingress *IngressSpec `json:"ingress,omitempty"` + // ModelRef references a model that this component serves + // When specified, a headless service will be created for endpoint discovery + // +optional + ModelRef *ModelReference `json:"modelRef,omitempty"` + // SharedMemory controls the tmpfs mounted at /dev/shm (enable/disable and size). SharedMemory *SharedMemorySpec `json:"sharedMemory,omitempty"` @@ -269,3 +274,14 @@ func (s *DynamoComponentDeployment) GetParentGraphDeploymentName() string { func (s *DynamoComponentDeployment) GetParentGraphDeploymentNamespace() string { return s.GetNamespace() } + +// ModelReference identifies a model served by this component +type ModelReference struct { + // Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") + // +kubebuilder:validation:Required + Name string `json:"name"` + + // Revision is the model revision/version (optional) + // +optional + Revision string `json:"revision,omitempty"` +} diff --git a/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go b/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go index ece6e19eb5..5cf40f4004 100644 --- a/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -40,9 +40,9 @@ package v1alpha1 import ( "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common" "k8s.io/api/autoscaling/v2" - "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -98,7 +98,7 @@ func (in *BaseStatus) DeepCopyInto(out *BaseStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -267,7 +267,7 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent } if in.Envs != nil { in, out := &in.Envs, &out.Envs - *out = make([]v1.EnvVar, len(*in)) + *out = make([]corev1.EnvVar, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -287,6 +287,11 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent *out = new(IngressSpec) (*in).DeepCopyInto(*out) } + if in.ModelRef != nil { + in, out := &in.ModelRef, &out.ModelRef + *out = new(ModelReference) + **out = **in + } if in.SharedMemory != nil { in, out := &in.SharedMemory, &out.SharedMemory *out = new(SharedMemorySpec) @@ -304,12 +309,12 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent } if in.LivenessProbe != nil { in, out := &in.LivenessProbe, &out.LivenessProbe - *out = new(v1.Probe) + *out = new(corev1.Probe) (*in).DeepCopyInto(*out) } if in.ReadinessProbe != nil { in, out := &in.ReadinessProbe, &out.ReadinessProbe - *out = new(v1.Probe) + *out = new(corev1.Probe) (*in).DeepCopyInto(*out) } if in.Replicas != nil { @@ -355,7 +360,7 @@ func (in *DynamoComponentDeploymentStatus) DeepCopyInto(out *DynamoComponentDepl *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -523,7 +528,7 @@ func (in *DynamoGraphDeploymentRequestStatus) DeepCopyInto(out *DynamoGraphDeplo *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -578,7 +583,7 @@ func (in *DynamoGraphDeploymentSpec) DeepCopyInto(out *DynamoGraphDeploymentSpec } if in.Envs != nil { in, out := &in.Envs, &out.Envs - *out = make([]v1.EnvVar, len(*in)) + *out = make([]corev1.EnvVar, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -600,7 +605,7 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -617,6 +622,122 @@ func (in *DynamoGraphDeploymentStatus) DeepCopy() *DynamoGraphDeploymentStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DynamoModel) DeepCopyInto(out *DynamoModel) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoModel. +func (in *DynamoModel) DeepCopy() *DynamoModel { + if in == nil { + return nil + } + out := new(DynamoModel) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DynamoModel) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DynamoModelList) DeepCopyInto(out *DynamoModelList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]DynamoModel, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoModelList. +func (in *DynamoModelList) DeepCopy() *DynamoModelList { + if in == nil { + return nil + } + out := new(DynamoModelList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DynamoModelList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DynamoModelSpec) DeepCopyInto(out *DynamoModelSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoModelSpec. +func (in *DynamoModelSpec) DeepCopy() *DynamoModelSpec { + if in == nil { + return nil + } + out := new(DynamoModelSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DynamoModelStatus) DeepCopyInto(out *DynamoModelStatus) { + *out = *in + if in.Endpoints != nil { + in, out := &in.Endpoints, &out.Endpoints + *out = make([]EndpointInfo, len(*in)) + copy(*out, *in) + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoModelStatus. +func (in *DynamoModelStatus) DeepCopy() *DynamoModelStatus { + if in == nil { + return nil + } + out := new(DynamoModelStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EndpointInfo) DeepCopyInto(out *EndpointInfo) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EndpointInfo. +func (in *EndpointInfo) DeepCopy() *EndpointInfo { + if in == nil { + return nil + } + out := new(EndpointInfo) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *IngressSpec) DeepCopyInto(out *IngressSpec) { *out = *in @@ -686,6 +807,21 @@ func (in *IngressTLSSpec) DeepCopy() *IngressTLSSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelReference) DeepCopyInto(out *ModelReference) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelReference. +func (in *ModelReference) DeepCopy() *ModelReference { + if in == nil { + return nil + } + out := new(ModelReference) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MultinodeSpec) DeepCopyInto(out *MultinodeSpec) { *out = *in diff --git a/deploy/cloud/operator/cmd/main.go b/deploy/cloud/operator/cmd/main.go index 06585b7720..9012120ee2 100644 --- a/deploy/cloud/operator/cmd/main.go +++ b/deploy/cloud/operator/cmd/main.go @@ -60,6 +60,7 @@ import ( "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller" commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/etcd" + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/modelendpoint" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/namespace_scope" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/rbac" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/secret" @@ -559,6 +560,15 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeploymentRequest") os.Exit(1) } + + if err = (&controller.DynamoModelReconciler{ + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("dynamomodel"), + Prober: modelendpoint.NewProber(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "DynamoModel") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamocomponentdeployments.yaml b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamocomponentdeployments.yaml index f4164dc841..6f9053c54a 100644 --- a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamocomponentdeployments.yaml +++ b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamocomponentdeployments.yaml @@ -10002,6 +10002,20 @@ spec: format: int32 type: integer type: object + modelRef: + description: |- + ModelRef references a model that this component serves + When specified, a headless service will be created for endpoint discovery + properties: + name: + description: Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") + type: string + revision: + description: Revision is the model revision/version (optional) + type: string + required: + - name + type: object multinode: description: Multinode is the configuration for multinode components. properties: diff --git a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamographdeployments.yaml b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamographdeployments.yaml index 037dfecebb..a5647db120 100644 --- a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamographdeployments.yaml +++ b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamographdeployments.yaml @@ -10136,6 +10136,20 @@ spec: format: int32 type: integer type: object + modelRef: + description: |- + ModelRef references a model that this component serves + When specified, a headless service will be created for endpoint discovery + properties: + name: + description: Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") + type: string + revision: + description: Revision is the model revision/version (optional) + type: string + required: + - name + type: object multinode: description: Multinode is the configuration for multinode components. properties: diff --git a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml new file mode 100644 index 0000000000..473c20a004 --- /dev/null +++ b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml @@ -0,0 +1,198 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.4 + helm.sh/resource-policy: keep + name: dynamomodels.nvidia.com +spec: + group: nvidia.com + names: + kind: DynamoModel + listKind: DynamoModelList + plural: dynamomodels + shortNames: + - dm + singular: dynamomodel + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Base model name + jsonPath: .spec.baseModelName + name: BaseModel + type: string + - description: Model type + jsonPath: .spec.modelType + name: Type + type: string + - description: Ready endpoints + jsonPath: .status.readyEndpoints + name: Ready + type: integer + - description: Total endpoints + jsonPath: .status.totalEndpoints + name: Total + type: integer + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + description: DynamoModel is the Schema for the dynamo models API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: DynamoModelSpec defines the desired state of DynamoModel + properties: + baseModelName: + description: |- + BaseModelName is the base model identifier that matches the service label + This is used to discover endpoints via headless services + type: string + loraPath: + description: LoraPath is the path to the LoRA adapter (only applicable for lora model type) + type: string + modelName: + description: ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") + type: string + modelType: + default: base + description: ModelType specifies the type of model (e.g., "base", "lora", "adapter") + enum: + - base + - lora + - adapter + type: string + required: + - baseModelName + - modelName + type: object + status: + description: DynamoModelStatus defines the observed state of DynamoModel + properties: + conditions: + description: Conditions represents the latest available observations of the model's state + items: + description: Condition contains details for one aspect of the current state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + endpoints: + description: Endpoints is the current list of all endpoints for this model + items: + description: EndpointInfo represents a single endpoint (pod) serving the model + properties: + address: + description: Address is the full address of the endpoint (e.g., "http://10.0.1.5:9090") + type: string + podName: + description: PodName is the name of the pod serving this endpoint + type: string + ready: + description: |- + Ready indicates whether the endpoint is ready to serve traffic + For LoRA models: true if the LoRA was successfully loaded and appears in GET /v1/loras + For base models: always false (no probing performed) + type: boolean + required: + - address + - ready + type: object + type: array + readyEndpoints: + description: ReadyEndpoints is the count of endpoints that are ready + type: integer + totalEndpoints: + description: TotalEndpoints is the total count of endpoints + type: integer + required: + - readyEndpoints + - totalEndpoints + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/deploy/cloud/operator/config/rbac/role.yaml b/deploy/cloud/operator/config/rbac/role.yaml index b3436669f9..0e3bbc78f6 100644 --- a/deploy/cloud/operator/config/rbac/role.yaml +++ b/deploy/cloud/operator/config/rbac/role.yaml @@ -86,6 +86,14 @@ rules: - patch - update - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - events.k8s.io resources: @@ -162,6 +170,7 @@ rules: - dynamocomponentdeployments - dynamographdeploymentrequests - dynamographdeployments + - dynamomodels verbs: - create - delete @@ -176,6 +185,7 @@ rules: - dynamocomponentdeployments/finalizers - dynamographdeploymentrequests/finalizers - dynamographdeployments/finalizers + - dynamomodels/finalizers verbs: - update - apiGroups: @@ -184,6 +194,7 @@ rules: - dynamocomponentdeployments/status - dynamographdeploymentrequests/status - dynamographdeployments/status + - dynamomodels/status verbs: - get - patch diff --git a/deploy/cloud/operator/internal/consts/consts.go b/deploy/cloud/operator/internal/consts/consts.go index 46784e6726..ca1c01b64d 100644 --- a/deploy/cloud/operator/internal/consts/consts.go +++ b/deploy/cloud/operator/internal/consts/consts.go @@ -38,6 +38,7 @@ const ( KubeLabelDynamoDeploymentTargetType = "nvidia.com/dynamo-deployment-target-type" KubeLabelDynamoComponentType = "nvidia.com/dynamo-component-type" KubeLabelDynamoSubComponentType = "nvidia.com/dynamo-sub-component-type" + KubeLabelDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeLabelValueFalse = "false" KubeLabelValueTrue = "true" diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go new file mode 100644 index 0000000000..e4ddea9de0 --- /dev/null +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -0,0 +1,362 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/modelendpoint" +) + +const ( + // Condition types + ConditionTypeEndpointsReady = "EndpointsReady" + ConditionTypeServicesFound = "ServicesFound" + + // Condition reasons + ReasonEndpointsDiscovered = "EndpointsDiscovered" + ReasonNoReadyEndpoints = "NoReadyEndpoints" + ReasonNoEndpoints = "NoEndpoints" + ReasonServicesFound = "ServicesFound" + ReasonNoServicesFound = "NoServicesFound" + + // Field index names + dynamoModelBaseModelIndex = ".spec.baseModelName" + + // Finalizer + dynamoModelFinalizer = "nvidia.com/dynamo-model-finalizer" +) + +// DynamoModelReconciler reconciles a DynamoModel object +type DynamoModelReconciler struct { + client.Client + Recorder record.EventRecorder + Prober *modelendpoint.Prober +} + +// +kubebuilder:rbac:groups=nvidia.com,resources=dynamomodels,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=nvidia.com,resources=dynamomodels/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=nvidia.com,resources=dynamomodels/finalizers,verbs=update +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch +// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch + +// Reconcile handles the reconciliation loop for DynamoModel resources +func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logs := log.FromContext(ctx) + + // Fetch the DynamoModel + model := &v1alpha1.DynamoModel{} + if err := r.Get(ctx, req.NamespacedName, model); err != nil { + if k8serrors.IsNotFound(err) { + logs.Info("DynamoModel resource not found. Ignoring since object must be deleted") + return ctrl.Result{}, nil + } + logs.Error(err, "Failed to get DynamoModel") + return ctrl.Result{}, err + } + + logs = logs.WithValues("dynamoModel", model.Name, "namespace", model.Namespace, "baseModelName", model.Spec.BaseModelName) + logs.Info("Reconciling DynamoModel") + + // Handle deletion with finalizer + if !model.DeletionTimestamp.IsZero() { + return r.handleDeletion(ctx, model) + } + + // Add finalizer if not present + if !controllerutil.ContainsFinalizer(model, dynamoModelFinalizer) { + controllerutil.AddFinalizer(model, dynamoModelFinalizer) + if err := r.Update(ctx, model); err != nil { + logs.Error(err, "Failed to add finalizer") + return ctrl.Result{}, err + } + logs.Info("Added finalizer to DynamoModel") + return ctrl.Result{}, nil + } + + // Get endpoint candidates (common logic) + candidates, serviceNames, err := r.getEndpointCandidates(ctx, model) + if err != nil { + // Error already logged and status updated in helper + return ctrl.Result{RequeueAfter: 30 * time.Second}, err + } + + if len(candidates) == 0 { + msg := fmt.Sprintf("No endpoint slices found for base model %s", model.Spec.BaseModelName) + logs.Info(msg) + r.Recorder.Event(model, corev1.EventTypeWarning, "NoEndpointsFound", msg) + r.updateCondition(model, ConditionTypeServicesFound, metav1.ConditionFalse, ReasonNoServicesFound, msg) + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoEndpoints, msg) + model.Status.Endpoints = nil + model.Status.TotalEndpoints = 0 + model.Status.ReadyEndpoints = 0 + if err := r.Status().Update(ctx, model); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // Initialize prober if needed + if r.Prober == nil { + r.Prober = modelendpoint.NewProber() + } + + // Probe all endpoints in parallel with bounded concurrency + allEndpoints, probeErr := r.Prober.ProbeEndpoints(ctx, candidates, model) + hasFailures := probeErr != nil || countReadyEndpoints(allEndpoints) < len(allEndpoints) + + if probeErr != nil { + logs.Error(probeErr, "Some endpoints failed during probing") + r.Recorder.Event(model, corev1.EventTypeWarning, "PartialEndpointFailure", + fmt.Sprintf("Some endpoints failed to load LoRA: %v", probeErr)) + } + + // Update service found condition based on whether we found any services + if len(serviceNames) > 0 { + r.updateCondition(model, ConditionTypeServicesFound, metav1.ConditionTrue, ReasonServicesFound, + fmt.Sprintf("Found %d service(s)", len(serviceNames))) + } else { + r.updateCondition(model, ConditionTypeServicesFound, metav1.ConditionFalse, ReasonNoServicesFound, + "No services associated with endpoint slices") + } + + // Update status + model.Status.Endpoints = allEndpoints + model.Status.TotalEndpoints = len(allEndpoints) + model.Status.ReadyEndpoints = countReadyEndpoints(allEndpoints) + + // Update conditions + if model.Status.ReadyEndpoints > 0 { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionTrue, ReasonEndpointsDiscovered, + fmt.Sprintf("Found %d ready endpoint(s) out of %d total", model.Status.ReadyEndpoints, model.Status.TotalEndpoints)) + r.Recorder.Eventf(model, corev1.EventTypeNormal, "EndpointsReady", + "Discovered %d ready endpoints for base model %s", model.Status.ReadyEndpoints, model.Spec.BaseModelName) + } else if model.Status.TotalEndpoints > 0 { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoReadyEndpoints, + fmt.Sprintf("Found %d endpoint(s) but none are ready", model.Status.TotalEndpoints)) + r.Recorder.Event(model, corev1.EventTypeWarning, "NoReadyEndpoints", "Endpoints exist but none are ready") + } else { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoEndpoints, "No endpoints found") + } + + if err := r.Status().Update(ctx, model); err != nil { + logs.Error(err, "Failed to update DynamoModel status") + return ctrl.Result{}, err + } + + logs.Info("Successfully reconciled DynamoModel", + "totalEndpoints", model.Status.TotalEndpoints, + "readyEndpoints", model.Status.ReadyEndpoints) + + // Requeue if there were probe failures to retry loading LoRAs + if hasFailures { + logs.Info("Requeuing due to endpoint probe failures", + "ready", model.Status.ReadyEndpoints, + "total", model.Status.TotalEndpoints) + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + return ctrl.Result{}, nil +} + +// countReadyEndpoints counts how many endpoints are ready +func countReadyEndpoints(endpoints []v1alpha1.EndpointInfo) int { + count := 0 + for _, ep := range endpoints { + if ep.Ready { + count++ + } + } + return count +} + +// updateCondition updates or adds a condition to the model's status +func (r *DynamoModelReconciler) updateCondition(model *v1alpha1.DynamoModel, condType string, status metav1.ConditionStatus, reason, message string) { + condition := metav1.Condition{ + Type: condType, + Status: status, + ObservedGeneration: model.Generation, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } + meta.SetStatusCondition(&model.Status.Conditions, condition) +} + +// SetupWithManager sets up the controller with the Manager +func (r *DynamoModelReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Register field indexer for DynamoModels by base model name + // This allows efficient O(1) queries: "get all DynamoModels for base-model X" + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &v1alpha1.DynamoModel{}, + dynamoModelBaseModelIndex, + func(obj client.Object) []string { + model := obj.(*v1alpha1.DynamoModel) + return []string{model.Spec.BaseModelName} + }, + ); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.DynamoModel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + // Watch EndpointSlices - reconcile when endpoints change (Service changes trigger EndpointSlice updates) + Watches( + &discoveryv1.EndpointSlice{}, + handler.EnqueueRequestsFromMapFunc(r.findModelsForEndpointSlice), + builder.WithPredicates(predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { return false }, + }), + ). + Complete(r) +} + +// findModelsForEndpointSlice maps an EndpointSlice to DynamoModels +func (r *DynamoModelReconciler) findModelsForEndpointSlice(ctx context.Context, obj client.Object) []reconcile.Request { + slice := obj.(*discoveryv1.EndpointSlice) + logs := log.FromContext(ctx).WithValues("endpointSlice", slice.Name, "namespace", slice.Namespace) + + // Get the service name from the EndpointSlice label + // Since service name = base model name, we can directly use it to find models + serviceName, ok := slice.Labels[discoveryv1.LabelServiceName] + if !ok { + return nil + } + + // Find all DynamoModels for this base model using field indexer + requests, err := modelendpoint.FindModelsForBaseModel(ctx, r.Client, slice.Namespace, serviceName, dynamoModelBaseModelIndex) + if err != nil { + return nil + } + + if len(requests) > 0 { + logs.V(1).Info("EndpointSlice change triggered DynamoModel reconciliation", "modelCount", len(requests), "baseModel", serviceName) + } + + return requests +} + +// handleDeletion handles cleanup when a DynamoModel is being deleted +func (r *DynamoModelReconciler) handleDeletion(ctx context.Context, model *v1alpha1.DynamoModel) (ctrl.Result, error) { + logs := log.FromContext(ctx) + + // Check if finalizer is present + if !controllerutil.ContainsFinalizer(model, dynamoModelFinalizer) { + logs.Info("Finalizer not found, skipping cleanup") + return ctrl.Result{}, nil + } + + logs.Info("Handling DynamoModel deletion", "modelType", model.Spec.ModelType) + + // Only perform cleanup for LoRA models + if model.Spec.ModelType == "lora" { + // Get endpoint candidates (reusing common logic) + candidates, _, err := r.getEndpointCandidates(ctx, model) + if err != nil { + logs.Error(err, "Failed to get endpoints during deletion") + r.Recorder.Event(model, corev1.EventTypeWarning, "CleanupFailed", err.Error()) + // Continue with deletion even if we can't get endpoints + } else if len(candidates) > 0 { + logs.Info("Unloading LoRA from endpoints", "endpointCount", len(candidates)) + + // Initialize prober if needed + if r.Prober == nil { + r.Prober = modelendpoint.NewProber() + } + + // Unload LoRA from all endpoints in parallel + if err := r.Prober.UnloadLoRA(ctx, candidates, model.Spec.ModelName); err != nil { + logs.Error(err, "Failed to unload LoRA from some endpoints") + r.Recorder.Event(model, corev1.EventTypeWarning, "LoRAUnloadFailed", + fmt.Sprintf("Failed to unload LoRA from some endpoints: %v", err)) + // Continue with deletion even if unload fails + } else { + logs.Info("Successfully unloaded LoRA from all endpoints") + r.Recorder.Event(model, corev1.EventTypeNormal, "LoRAUnloaded", + fmt.Sprintf("Unloaded LoRA from %d endpoint(s)", len(candidates))) + } + } else { + logs.Info("No endpoints found for cleanup") + } + } else { + logs.Info("Skipping cleanup for non-LoRA model") + } + + // Remove finalizer using controllerutil + controllerutil.RemoveFinalizer(model, dynamoModelFinalizer) + if err := r.Update(ctx, model); err != nil { + logs.Error(err, "Failed to remove finalizer") + return ctrl.Result{}, err + } + + logs.Info("Finalizer removed, DynamoModel will be deleted") + return ctrl.Result{}, nil +} + +// getEndpointCandidates fetches EndpointSlices and extracts endpoint candidates +// Returns candidates, service names, and error +func (r *DynamoModelReconciler) getEndpointCandidates( + ctx context.Context, + model *v1alpha1.DynamoModel, +) ([]modelendpoint.Candidate, map[string]bool, error) { + logs := log.FromContext(ctx) + + // Get EndpointSlices for the headless service + // Service name = base model name, so we can directly query by service label + endpointSlices := &discoveryv1.EndpointSliceList{} + if err := r.List(ctx, endpointSlices, + client.InNamespace(model.Namespace), + client.MatchingLabels{discoveryv1.LabelServiceName: model.Spec.BaseModelName}, + ); err != nil { + logs.Error(err, "Failed to list endpoint slices for model") + r.Recorder.Event(model, corev1.EventTypeWarning, "EndpointDiscoveryFailed", err.Error()) + return nil, nil, err + } + + if len(endpointSlices.Items) == 0 { + return nil, nil, nil + } + + logs.Info("Found endpoint slices for model", "count", len(endpointSlices.Items)) + + // Extract pod-ready endpoint candidates from all EndpointSlices + candidates, serviceNames := modelendpoint.ExtractCandidates(endpointSlices, int32(consts.DynamoSystemPort)) + + return candidates, serviceNames, nil +} diff --git a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go index 17f2720b4a..1d772903df 100644 --- a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go @@ -327,6 +327,21 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req modified = true } + // create or update headless service for model endpoint discovery + componentMap := map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{ + dynamoComponentDeployment.Name: &dynamoComponentDeployment.Spec.DynamoComponentDeploymentSharedSpec, + } + if err := dynamo.ReconcileModelServicesForComponents( + ctx, + r, + dynamoComponentDeployment, + componentMap, + dynamoComponentDeployment.Namespace, + ); err != nil { + logs.Error(err, "Failed to reconcile model service") + return ctrl.Result{}, err + } + // create or update api-server ingresses modified_, err = r.createOrUpdateOrDeleteIngress(ctx, generateResourceOption{ dynamoComponentDeployment: dynamoComponentDeployment, @@ -926,10 +941,17 @@ func (r *DynamoComponentDeploymentReconciler) getGenericServiceName(dynamoCompon } func (r *DynamoComponentDeploymentReconciler) getKubeLabels(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string { + labels := map[string]string{} if dynamoComponentDeployment != nil && dynamoComponentDeployment.Labels != nil { - return dynamoComponentDeployment.Labels + for k, v := range dynamoComponentDeployment.Labels { + labels[k] = v + } + } + // Add base model label if modelRef is specified + if dynamoComponentDeployment != nil { + dynamo.AddBaseModelLabel(labels, dynamoComponentDeployment.Spec.ModelRef) } - return map[string]string{} + return labels } func (r *DynamoComponentDeploymentReconciler) getKubeAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string { diff --git a/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go b/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go index e0dabf614f..e366631906 100644 --- a/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go @@ -341,6 +341,18 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co return "", "", "", fmt.Errorf("failed to reconcile Grove scaling: %w", err) } + // Reconcile headless services for model endpoint discovery + if err := dynamo.ReconcileModelServicesForComponents( + ctx, + r, + dynamoDeployment, + dynamoDeployment.Spec.Services, + dynamoDeployment.Namespace, + ); err != nil { + logger.Error(err, "failed to reconcile model services") + return "", "", "", fmt.Errorf("failed to reconcile model services: %w", err) + } + resources := []Resource{groveGangSetAsResource} for componentName, component := range dynamoDeployment.Spec.Services { if component.ComponentType == consts.ComponentTypeFrontend { diff --git a/deploy/cloud/operator/internal/dynamo/graph.go b/deploy/cloud/operator/internal/dynamo/graph.go index e1cb39c396..a8b6469de0 100644 --- a/deploy/cloud/operator/internal/dynamo/graph.go +++ b/deploy/cloud/operator/internal/dynamo/graph.go @@ -1026,6 +1026,8 @@ func generateLabels(component *v1alpha1.DynamoComponentDeploymentSharedSpec, dyn if component.SubComponentType != "" { labels[commonconsts.KubeLabelDynamoSubComponentType] = component.SubComponentType } + // Add base model label if modelRef is specified + AddBaseModelLabel(labels, component.ModelRef) setMetricsLabels(labels, dynamoDeployment) if component.Labels != nil { err := mergo.Merge(&labels, component.Labels, mergo.WithOverride) diff --git a/deploy/cloud/operator/internal/dynamo/headless_service.go b/deploy/cloud/operator/internal/dynamo/headless_service.go new file mode 100644 index 0000000000..0b685314c6 --- /dev/null +++ b/deploy/cloud/operator/internal/dynamo/headless_service.go @@ -0,0 +1,147 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dynamo + +import ( + "context" + "fmt" + + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" + commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" + commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ReconcileModelServicesForComponents creates headless services for components with modelRef +// This is common logic used by both DynamoGraphDeployment and DynamoComponentDeployment controllers +// reconciler must implement controller_common.Reconciler interface +func ReconcileModelServicesForComponents( + ctx context.Context, + reconciler commonController.Reconciler, + owner client.Object, + services map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec, + namespace string, +) error { + logger := log.FromContext(ctx) + + // Track unique base models to avoid creating duplicate services + seenBaseModels := make(map[string]bool) + + for componentName, component := range services { + // Skip if no modelRef + if component.ModelRef == nil || component.ModelRef.Name == "" { + continue + } + + baseModelName := component.ModelRef.Name + + // Skip if we've already created service for this base model + if seenBaseModels[baseModelName] { + logger.V(1).Info("Skipping duplicate headless service for base model", + "componentName", componentName, + "baseModelName", baseModelName) + continue + } + seenBaseModels[baseModelName] = true + + // Generate headless service + headlessService := GenerateHeadlessServiceForModel( + ctx, + baseModelName, // Use base model name as service name + namespace, + baseModelName, + ) + + // Sync the service (create or update) + _, syncedService, err := commonController.SyncResource( + ctx, + reconciler, + owner, + func(ctx context.Context) (*corev1.Service, bool, error) { + return headlessService, false, nil + }, + ) + if err != nil { + logger.Error(err, "Failed to sync headless service for model", + "baseModelName", baseModelName, + "componentName", componentName) + return fmt.Errorf("failed to sync headless service for model %s: %w", baseModelName, err) + } + + logger.Info("Synced headless service for model", + "serviceName", syncedService.GetName(), + "baseModelName", baseModelName, + "namespace", namespace) + } + + return nil +} + +// GenerateHeadlessServiceForModel creates a headless service for model endpoint discovery +func GenerateHeadlessServiceForModel( + ctx context.Context, + serviceName string, + namespace string, + baseModelName string, +) *corev1.Service { + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: namespace, + Labels: map[string]string{ + commonconsts.KubeLabelDynamoBaseModel: baseModelName, + "nvidia.com/managed-by": "dynamo-operator", + }, + }, + Spec: corev1.ServiceSpec{ + // Headless service - no ClusterIP, no load balancing + ClusterIP: corev1.ClusterIPNone, + + // Selector to match pods with the base model label + Selector: map[string]string{ + commonconsts.KubeLabelDynamoBaseModel: baseModelName, + }, + + // Don't publish not-ready addresses - only ready pods in EndpointSlices + PublishNotReadyAddresses: false, + + // System port for model HTTP APIs + Ports: []corev1.ServicePort{ + { + Name: commonconsts.DynamoSystemPortName, + Port: commonconsts.DynamoSystemPort, + TargetPort: intstr.FromInt32(commonconsts.DynamoSystemPort), + Protocol: corev1.ProtocolTCP, + }, + }, + }, + } + + return service +} + +// AddBaseModelLabel adds the base model label to a label map if modelRef is present +func AddBaseModelLabel(labels map[string]string, modelRef *v1alpha1.ModelReference) { + if modelRef != nil && modelRef.Name != "" { + labels[commonconsts.KubeLabelDynamoBaseModel] = modelRef.Name + } +} diff --git a/deploy/cloud/operator/internal/modelendpoint/discovery.go b/deploy/cloud/operator/internal/modelendpoint/discovery.go new file mode 100644 index 0000000000..cd404c8eac --- /dev/null +++ b/deploy/cloud/operator/internal/modelendpoint/discovery.go @@ -0,0 +1,112 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package modelendpoint + +import ( + "context" + "fmt" + + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" +) + +// ExtractCandidates extracts endpoint candidates from EndpointSlices +// Only returns endpoints that are pod-ready +func ExtractCandidates(endpointSlices *discoveryv1.EndpointSliceList, port int32) ([]Candidate, map[string]bool) { + var candidates []Candidate + serviceNames := make(map[string]bool) + + for _, slice := range endpointSlices.Items { + serviceName := slice.Labels[discoveryv1.LabelServiceName] + if serviceName != "" { + serviceNames[serviceName] = true + } + + for _, ep := range slice.Endpoints { + if len(ep.Addresses) == 0 { + continue + } + + // Only consider endpoints that are ready at the pod level + podReady := ep.Conditions.Ready != nil && *ep.Conditions.Ready + if !podReady { + continue + } + + // Get pod name from TargetRef + podName := "" + if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" { + podName = ep.TargetRef.Name + } + + for _, addr := range ep.Addresses { + address := fmt.Sprintf("http://%s:%d", addr, port) + candidates = append(candidates, Candidate{ + Address: address, + PodName: podName, + }) + } + } + } + + return candidates, serviceNames +} + +// FindModelsForBaseModel finds all DynamoModels that reference a specific base model +// Uses field indexer for efficient O(1) lookup +func FindModelsForBaseModel( + ctx context.Context, + c client.Client, + namespace string, + baseModelName string, + indexField string, +) ([]reconcile.Request, error) { + logs := log.FromContext(ctx) + + models := &v1alpha1.DynamoModelList{} + if err := c.List(ctx, models, + client.InNamespace(namespace), + client.MatchingFields{indexField: baseModelName}, + ); err != nil { + logs.Error(err, "Failed to list DynamoModels", "baseModel", baseModelName) + return nil, err + } + + requests := make([]reconcile.Request, 0, len(models.Items)) + for _, model := range models.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: model.Name, + Namespace: model.Namespace, + }, + }) + } + + if len(requests) > 0 { + logs.V(1).Info("Found DynamoModels for base model", + "baseModel", baseModelName, + "count", len(requests)) + } + + return requests, nil +} diff --git a/deploy/cloud/operator/internal/modelendpoint/lora.go b/deploy/cloud/operator/internal/modelendpoint/lora.go new file mode 100644 index 0000000000..5267e05d17 --- /dev/null +++ b/deploy/cloud/operator/internal/modelendpoint/lora.go @@ -0,0 +1,175 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package modelendpoint + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// LoadLoRA loads a LoRA model on the specified endpoint +func (p *Prober) LoadLoRA(ctx context.Context, address, modelName string) error { + logs := log.FromContext(ctx) + + loadReq := map[string]interface{}{ + "lora_name": modelName, + "lora_path": modelName, // May need to adjust based on actual API + } + + loadBody, err := json.Marshal(loadReq) + if err != nil { + return fmt.Errorf("failed to marshal load LoRA request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", address+"/v1/load_lora", bytes.NewBuffer(loadBody)) + if err != nil { + return fmt.Errorf("failed to create load LoRA request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to call load LoRA endpoint: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + logs.V(1).Info("Load LoRA failed", "address", address, "status", resp.StatusCode, "body", string(body)) + return fmt.Errorf("load LoRA failed with status %d: %s", resp.StatusCode, string(body)) + } + + logs.Info("Successfully loaded LoRA", "address", address, "modelName", modelName) + return nil +} + +// VerifyLoRALoaded checks if a LoRA model is present in the GET /loras response +func (p *Prober) VerifyLoRALoaded(ctx context.Context, address, modelName string) bool { + logs := log.FromContext(ctx) + + req, err := http.NewRequestWithContext(ctx, "GET", address+"/v1/loras", nil) + if err != nil { + logs.V(1).Info("Failed to create GET loras request", "error", err) + return false + } + + resp, err := p.httpClient.Do(req) + if err != nil { + logs.V(1).Info("Failed to call GET loras endpoint", "address", address, "error", err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + logs.V(1).Info("GET loras failed", "address", address, "status", resp.StatusCode) + return false + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + logs.V(1).Info("Failed to read loras response", "error", err) + return false + } + + // Parse the response - expecting a JSON array or object with lora names + var lorasResp struct { + Loras []string `json:"loras"` + } + + if err := json.Unmarshal(body, &lorasResp); err != nil { + // Try parsing as a simple array + var lorasList []string + if err := json.Unmarshal(body, &lorasList); err != nil { + logs.V(1).Info("Failed to parse loras response", "error", err, "body", string(body)) + return false + } + lorasResp.Loras = lorasList + } + + // Check if our model is in the list + for _, lora := range lorasResp.Loras { + if lora == modelName { + logs.V(1).Info("LoRA model verified as loaded", "address", address, "modelName", modelName) + return true + } + } + + logs.V(1).Info("LoRA model not found in list", "address", address, "modelName", modelName, "availableLoras", lorasResp.Loras) + return false +} + +// probeLoRAEndpoint checks if LoRA is already loaded, and loads it if not +func (p *Prober) probeLoRAEndpoint(ctx context.Context, address, modelName string) bool { + logs := log.FromContext(ctx) + + // Step 1: Check if LoRA is already loaded + if p.VerifyLoRALoaded(ctx, address, modelName) { + logs.V(1).Info("LoRA already loaded", "address", address, "modelName", modelName) + return true + } + + // Step 2: Load the LoRA since it's not present + if err := p.LoadLoRA(ctx, address, modelName); err != nil { + logs.V(1).Info("Failed to load LoRA", "address", address, "error", err) + return false + } + + // Step 3: Verify the LoRA was loaded successfully + if p.VerifyLoRALoaded(ctx, address, modelName) { + return true + } + + logs.V(1).Info("LoRA load appeared successful but verification failed", "address", address) + return false +} + +// unloadLoRA unloads a LoRA model from a single endpoint +func (p *Prober) unloadLoRA(ctx context.Context, address, modelName string) error { + logs := log.FromContext(ctx) + + req, err := http.NewRequestWithContext(ctx, "DELETE", address+"/v1/loras/"+modelName, nil) + if err != nil { + logs.V(1).Info("Failed to create unload LoRA request", "error", err) + return fmt.Errorf("failed to create unload LoRA request: %w", err) + } + + resp, err := p.httpClient.Do(req) + if err != nil { + logs.V(1).Info("Failed to call unload LoRA endpoint", "address", address, "error", err) + return fmt.Errorf("failed to call unload LoRA endpoint: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + logs.V(1).Info("Unload LoRA endpoint returned error status", + "address", address, + "status", resp.StatusCode, + "body", string(body)) + return fmt.Errorf("unload LoRA failed with status %d: %s", resp.StatusCode, string(body)) + } + + logs.V(1).Info("Successfully unloaded LoRA", "address", address, "modelName", modelName) + return nil +} diff --git a/deploy/cloud/operator/internal/modelendpoint/prober.go b/deploy/cloud/operator/internal/modelendpoint/prober.go new file mode 100644 index 0000000000..215d140ac3 --- /dev/null +++ b/deploy/cloud/operator/internal/modelendpoint/prober.go @@ -0,0 +1,172 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package modelendpoint + +import ( + "context" + "net/http" + "strings" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/workerpool" +) + +const ( + // MaxConcurrentProbes is the maximum number of concurrent endpoint probes + MaxConcurrentProbes = 10 + // ProbeTimeout is the timeout for individual HTTP requests + ProbeTimeout = 15 * time.Second + // TotalProbeTimeout is the timeout for all probes to complete + TotalProbeTimeout = 30 * time.Second +) + +// Prober handles HTTP-based endpoint probing +type Prober struct { + httpClient *http.Client +} + +// NewProber creates a new endpoint prober +func NewProber() *Prober { + return &Prober{ + httpClient: &http.Client{ + Timeout: ProbeTimeout, + }, + } +} + +// ProbeEndpoints probes all endpoints in parallel with bounded concurrency +// Returns partial results even if some endpoints fail +func (p *Prober) ProbeEndpoints( + ctx context.Context, + candidates []Candidate, + model *v1alpha1.DynamoModel, +) ([]v1alpha1.EndpointInfo, error) { + logs := log.FromContext(ctx) + + // Skip probing for non-LoRA models + if strings.ToLower(model.Spec.ModelType) != "lora" { + logs.V(1).Info("Skipping probe for non-LoRA model", "modelType", model.Spec.ModelType) + endpoints := make([]v1alpha1.EndpointInfo, len(candidates)) + for i, c := range candidates { + endpoints[i] = v1alpha1.EndpointInfo{ + Address: c.Address, + PodName: c.PodName, + Ready: false, + } + } + return endpoints, nil + } + + // Build tasks for the worker pool + tasks := make([]workerpool.Task[v1alpha1.EndpointInfo], len(candidates)) + for i, candidate := range candidates { + c := candidate // Capture loop variable + tasks[i] = workerpool.Task[v1alpha1.EndpointInfo]{ + Index: i, + Work: func(ctx context.Context) (v1alpha1.EndpointInfo, error) { + // Probe the endpoint + ready := p.probeLoRAEndpoint(ctx, c.Address, model.Spec.ModelName) + + logs.V(1).Info("Endpoint probe completed", "address", c.Address, "ready", ready) + + return v1alpha1.EndpointInfo{ + Address: c.Address, + PodName: c.PodName, + Ready: ready, + }, nil + }, + } + } + + // Execute all probes in parallel with bounded concurrency + results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) + + // Extract endpoint info from results + endpoints := make([]v1alpha1.EndpointInfo, len(results)) + readyCount := 0 + for _, result := range results { + endpoints[result.Index] = result.Value + if result.Value.Ready { + readyCount++ + } + } + + logs.Info("Completed parallel endpoint probing", + "total", len(endpoints), + "ready", readyCount) + + return endpoints, err +} + +// UnloadLoRA unloads a LoRA model from all endpoints in parallel +func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelName string) error { + logs := log.FromContext(ctx) + + if len(candidates) == 0 { + logs.Info("No candidates to unload LoRA from") + return nil + } + + logs.Info("Starting parallel LoRA unload", "endpointCount", len(candidates), "modelName", modelName) + + // Build tasks for the worker pool + tasks := make([]workerpool.Task[bool], len(candidates)) + for i, candidate := range candidates { + c := candidate // Capture loop variable + tasks[i] = workerpool.Task[bool]{ + Index: i, + Work: func(ctx context.Context) (bool, error) { + // Unload the LoRA from this endpoint (calls method in lora.go) + err := p.unloadLoRA(ctx, c.Address, modelName) + if err != nil { + logs.V(1).Info("Failed to unload LoRA from endpoint", + "address", c.Address, + "modelName", modelName, + "error", err) + return false, err + } + + logs.V(1).Info("Successfully unloaded LoRA from endpoint", + "address", c.Address, + "modelName", modelName) + return true, nil + }, + } + } + + // Execute all unload operations in parallel with bounded concurrency + results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) + + // Count successes + successCount := 0 + for _, result := range results { + if result.Value { + successCount++ + } + } + + logs.Info("Completed parallel LoRA unload", + "total", len(candidates), + "successful", successCount, + "failed", len(candidates)-successCount) + + return err +} diff --git a/deploy/cloud/operator/internal/modelendpoint/types.go b/deploy/cloud/operator/internal/modelendpoint/types.go new file mode 100644 index 0000000000..879e2c578c --- /dev/null +++ b/deploy/cloud/operator/internal/modelendpoint/types.go @@ -0,0 +1,35 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package modelendpoint + +import ( + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" +) + +// Candidate represents an endpoint candidate to be probed +type Candidate struct { + Address string + PodName string +} + +// ProbeResult represents the result of probing a single endpoint +type ProbeResult struct { + Index int + Endpoint v1alpha1.EndpointInfo + Err error +} diff --git a/deploy/cloud/operator/internal/workerpool/pool.go b/deploy/cloud/operator/internal/workerpool/pool.go new file mode 100644 index 0000000000..c49abf3237 --- /dev/null +++ b/deploy/cloud/operator/internal/workerpool/pool.go @@ -0,0 +1,102 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workerpool + +import ( + "context" + "fmt" + "sync" + "time" +) + +// Task represents a unit of work to be executed +type Task[T any] struct { + Index int + Work func(ctx context.Context) (T, error) +} + +// Result represents the outcome of executing a task +type Result[T any] struct { + Index int + Value T + Err error +} + +// Execute runs all tasks in parallel with bounded concurrency +// Returns results in the same order as input tasks, even if execution order differs +// Continues executing all tasks even if some fail +func Execute[T any](ctx context.Context, maxWorkers int, timeout time.Duration, tasks []Task[T]) ([]Result[T], error) { + if len(tasks) == 0 { + return nil, nil + } + + // Create context with timeout + execCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // Create buffered channels + results := make(chan Result[T], len(tasks)) + semaphore := make(chan struct{}, maxWorkers) + + // Launch all task goroutines + var wg sync.WaitGroup + for _, task := range tasks { + wg.Add(1) + go func(t Task[T]) { + defer wg.Done() + + // Acquire semaphore slot (bounded concurrency) + semaphore <- struct{}{} + defer func() { <-semaphore }() + + // Execute the task + value, err := t.Work(execCtx) + + // Send result through channel + results <- Result[T]{ + Index: t.Index, + Value: value, + Err: err, + } + }(task) + } + + // Close results channel when all goroutines complete + go func() { + wg.Wait() + close(results) + }() + + // Collect results from channel + collectedResults := make([]Result[T], len(tasks)) + var errorCount int + + for result := range results { + collectedResults[result.Index] = result + if result.Err != nil { + errorCount++ + } + } + + // Return error if any tasks failed + if errorCount > 0 { + return collectedResults, fmt.Errorf("%d task(s) failed", errorCount) + } + + return collectedResults, nil +} From c103ff4620caab673d30746672754b38682838f3 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 10:37:00 -0700 Subject: [PATCH 2/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../operator/internal/modelendpoint/prober.go | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/deploy/cloud/operator/internal/modelendpoint/prober.go b/deploy/cloud/operator/internal/modelendpoint/prober.go index 215d140ac3..eeaa961c92 100644 --- a/deploy/cloud/operator/internal/modelendpoint/prober.go +++ b/deploy/cloud/operator/internal/modelendpoint/prober.go @@ -85,8 +85,6 @@ func (p *Prober) ProbeEndpoints( // Probe the endpoint ready := p.probeLoRAEndpoint(ctx, c.Address, model.Spec.ModelName) - logs.V(1).Info("Endpoint probe completed", "address", c.Address, "ready", ready) - return v1alpha1.EndpointInfo{ Address: c.Address, PodName: c.PodName, @@ -99,19 +97,30 @@ func (p *Prober) ProbeEndpoints( // Execute all probes in parallel with bounded concurrency results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) - // Extract endpoint info from results + // Extract endpoint info from results and collect failures endpoints := make([]v1alpha1.EndpointInfo, len(results)) readyCount := 0 + var notReadyEndpoints []string for _, result := range results { endpoints[result.Index] = result.Value if result.Value.Ready { readyCount++ + } else { + notReadyEndpoints = append(notReadyEndpoints, result.Value.Address) + if result.Err != nil { + logs.Info("Endpoint probe failed", + "address", result.Value.Address, + "podName", result.Value.PodName, + "error", result.Err) + } } } logs.Info("Completed parallel endpoint probing", "total", len(endpoints), - "ready", readyCount) + "ready", readyCount, + "notReady", len(notReadyEndpoints), + "notReadyEndpoints", notReadyEndpoints) return endpoints, err } @@ -137,16 +146,8 @@ func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelNa // Unload the LoRA from this endpoint (calls method in lora.go) err := p.unloadLoRA(ctx, c.Address, modelName) if err != nil { - logs.V(1).Info("Failed to unload LoRA from endpoint", - "address", c.Address, - "modelName", modelName, - "error", err) return false, err } - - logs.V(1).Info("Successfully unloaded LoRA from endpoint", - "address", c.Address, - "modelName", modelName) return true, nil }, } @@ -155,18 +156,28 @@ func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelNa // Execute all unload operations in parallel with bounded concurrency results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) - // Count successes + // Collect successes and failures with details successCount := 0 + var failedEndpoints []string for _, result := range results { if result.Value { successCount++ + } else { + // Log failed endpoint with error details + endpoint := candidates[result.Index].Address + failedEndpoints = append(failedEndpoints, endpoint) + logs.Info("Failed to unload LoRA from endpoint", + "address", endpoint, + "podName", candidates[result.Index].PodName, + "error", result.Err) } } logs.Info("Completed parallel LoRA unload", "total", len(candidates), "successful", successCount, - "failed", len(candidates)-successCount) + "failed", len(failedEndpoints), + "failedEndpoints", failedEndpoints) return err } From 885d7926e28929cd71963f19ae7f968a69227f71 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 14:18:48 -0700 Subject: [PATCH 3/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- deploy/cloud/operator/PROJECT | 8 ++ .../api/v1alpha1/dynamo_model_types.go | 14 ++- .../api/v1alpha1/zz_generated.deepcopy.go | 22 ++++- deploy/cloud/operator/cmd/main.go | 6 +- .../operator/config/crd/kustomization.yaml | 1 + .../config/samples/kustomization.yaml | 1 + .../controller/dynamo_model_controller.go | 77 ++++++--------- .../modelendpoint/{prober.go => client.go} | 72 ++++++++------ .../operator/internal/modelendpoint/lora.go | 97 ++----------------- .../operator/internal/modelendpoint/types.go | 13 +-- 10 files changed, 128 insertions(+), 183 deletions(-) rename deploy/cloud/operator/internal/modelendpoint/{prober.go => client.go} (68%) diff --git a/deploy/cloud/operator/PROJECT b/deploy/cloud/operator/PROJECT index a86430a2c2..38caf65e60 100644 --- a/deploy/cloud/operator/PROJECT +++ b/deploy/cloud/operator/PROJECT @@ -24,4 +24,12 @@ resources: kind: DynamoGraphDeployment path: github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: nvidia.com + kind: DynamoModel + path: github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1 + version: v1alpha1 version: "3" diff --git a/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go index 19ec687b70..0c06cf5e60 100644 --- a/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go +++ b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go @@ -38,9 +38,19 @@ type DynamoModelSpec struct { // +optional ModelType string `json:"modelType,omitempty"` - // LoraPath is the path to the LoRA adapter (only applicable for lora model type) + // Source specifies the model source location (only applicable for lora model type) // +optional - LoraPath string `json:"loraPath,omitempty"` + Source *ModelSource `json:"source,omitempty"` +} + +// ModelSource defines the source location of a model +type ModelSource struct { + // URI is the model source URI + // Supported formats: + // - S3: s3://bucket/path/to/model + // - HuggingFace: hf://org/model@revision_sha + // +kubebuilder:validation:Required + URI string `json:"uri"` } // EndpointInfo represents a single endpoint (pod) serving the model diff --git a/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go b/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go index 5cf40f4004..f5daa41282 100644 --- a/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/deploy/cloud/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -627,7 +627,7 @@ func (in *DynamoModel) DeepCopyInto(out *DynamoModel) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -684,6 +684,11 @@ func (in *DynamoModelList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DynamoModelSpec) DeepCopyInto(out *DynamoModelSpec) { *out = *in + if in.Source != nil { + in, out := &in.Source, &out.Source + *out = new(ModelSource) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoModelSpec. @@ -822,6 +827,21 @@ func (in *ModelReference) DeepCopy() *ModelReference { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelSource) DeepCopyInto(out *ModelSource) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelSource. +func (in *ModelSource) DeepCopy() *ModelSource { + if in == nil { + return nil + } + out := new(ModelSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MultinodeSpec) DeepCopyInto(out *MultinodeSpec) { *out = *in diff --git a/deploy/cloud/operator/cmd/main.go b/deploy/cloud/operator/cmd/main.go index 9012120ee2..7047656ea3 100644 --- a/deploy/cloud/operator/cmd/main.go +++ b/deploy/cloud/operator/cmd/main.go @@ -562,9 +562,9 @@ func main() { } if err = (&controller.DynamoModelReconciler{ - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor("dynamomodel"), - Prober: modelendpoint.NewProber(), + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("dynamomodel"), + EndpointClient: modelendpoint.NewClient(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DynamoModel") os.Exit(1) diff --git a/deploy/cloud/operator/config/crd/kustomization.yaml b/deploy/cloud/operator/config/crd/kustomization.yaml index b90253b45b..d0c878ddb3 100644 --- a/deploy/cloud/operator/config/crd/kustomization.yaml +++ b/deploy/cloud/operator/config/crd/kustomization.yaml @@ -19,6 +19,7 @@ resources: - bases/nvidia.com_dynamocomponentdeployments.yaml - bases/nvidia.com_dynamographdeployments.yaml +- bases/nvidia.com_dynamomodels.yaml #+kubebuilder:scaffold:crdkustomizeresource patches: [] diff --git a/deploy/cloud/operator/config/samples/kustomization.yaml b/deploy/cloud/operator/config/samples/kustomization.yaml index 56fefe67d6..30494c9b89 100644 --- a/deploy/cloud/operator/config/samples/kustomization.yaml +++ b/deploy/cloud/operator/config/samples/kustomization.yaml @@ -19,4 +19,5 @@ resources: - nvidia.com_v1alpha1_dynamocomponent.yaml - nvidia.com_v1alpha1_dynamographdeployment.yaml - nvidia.com_v1alpha1_dynamographdeploymentrequest.yaml +- nvidia.com_v1alpha1_dynamomodel.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go index e4ddea9de0..350d6cf455 100644 --- a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -31,7 +31,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,6 +39,7 @@ import ( "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" + commoncontroller "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/modelendpoint" ) @@ -57,16 +57,13 @@ const ( // Field index names dynamoModelBaseModelIndex = ".spec.baseModelName" - - // Finalizer - dynamoModelFinalizer = "nvidia.com/dynamo-model-finalizer" ) // DynamoModelReconciler reconciles a DynamoModel object type DynamoModelReconciler struct { client.Client - Recorder record.EventRecorder - Prober *modelendpoint.Prober + Recorder record.EventRecorder + EndpointClient *modelendpoint.Client } // +kubebuilder:rbac:groups=nvidia.com,resources=dynamomodels,verbs=get;list;watch;create;update;patch;delete @@ -93,19 +90,13 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) logs = logs.WithValues("dynamoModel", model.Name, "namespace", model.Namespace, "baseModelName", model.Spec.BaseModelName) logs.Info("Reconciling DynamoModel") - // Handle deletion with finalizer - if !model.DeletionTimestamp.IsZero() { - return r.handleDeletion(ctx, model) + // Handle finalizer using common handler + finalized, err := commoncontroller.HandleFinalizer(ctx, model, r.Client, r) + if err != nil { + return ctrl.Result{}, err } - - // Add finalizer if not present - if !controllerutil.ContainsFinalizer(model, dynamoModelFinalizer) { - controllerutil.AddFinalizer(model, dynamoModelFinalizer) - if err := r.Update(ctx, model); err != nil { - logs.Error(err, "Failed to add finalizer") - return ctrl.Result{}, err - } - logs.Info("Added finalizer to DynamoModel") + if finalized { + // Object was being deleted and finalizer has been called return ctrl.Result{}, nil } @@ -131,13 +122,13 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } - // Initialize prober if needed - if r.Prober == nil { - r.Prober = modelendpoint.NewProber() + // Initialize endpoint client if needed + if r.EndpointClient == nil { + r.EndpointClient = modelendpoint.NewClient() } - // Probe all endpoints in parallel with bounded concurrency - allEndpoints, probeErr := r.Prober.ProbeEndpoints(ctx, candidates, model) + // Load LoRA on all endpoints in parallel with bounded concurrency + allEndpoints, probeErr := r.EndpointClient.LoadLoRA(ctx, candidates, model) hasFailures := probeErr != nil || countReadyEndpoints(allEndpoints) < len(allEndpoints) if probeErr != nil { @@ -272,37 +263,36 @@ func (r *DynamoModelReconciler) findModelsForEndpointSlice(ctx context.Context, return requests } -// handleDeletion handles cleanup when a DynamoModel is being deleted -func (r *DynamoModelReconciler) handleDeletion(ctx context.Context, model *v1alpha1.DynamoModel) (ctrl.Result, error) { +// FinalizeResource implements the Finalizer interface +// Performs cleanup when a DynamoModel is being deleted +func (r *DynamoModelReconciler) FinalizeResource(ctx context.Context, model *v1alpha1.DynamoModel) error { logs := log.FromContext(ctx) - // Check if finalizer is present - if !controllerutil.ContainsFinalizer(model, dynamoModelFinalizer) { - logs.Info("Finalizer not found, skipping cleanup") - return ctrl.Result{}, nil - } - - logs.Info("Handling DynamoModel deletion", "modelType", model.Spec.ModelType) + logs.Info("Finalizing DynamoModel", "modelType", model.Spec.ModelType) // Only perform cleanup for LoRA models if model.Spec.ModelType == "lora" { // Get endpoint candidates (reusing common logic) candidates, _, err := r.getEndpointCandidates(ctx, model) if err != nil { - logs.Error(err, "Failed to get endpoints during deletion") + logs.Info("Failed to get endpoints during deletion, continuing with resource deletion", + "error", err.Error()) r.Recorder.Event(model, corev1.EventTypeWarning, "CleanupFailed", err.Error()) // Continue with deletion even if we can't get endpoints } else if len(candidates) > 0 { logs.Info("Unloading LoRA from endpoints", "endpointCount", len(candidates)) - // Initialize prober if needed - if r.Prober == nil { - r.Prober = modelendpoint.NewProber() + // Initialize endpoint client if needed + if r.EndpointClient == nil { + r.EndpointClient = modelendpoint.NewClient() } // Unload LoRA from all endpoints in parallel - if err := r.Prober.UnloadLoRA(ctx, candidates, model.Spec.ModelName); err != nil { - logs.Error(err, "Failed to unload LoRA from some endpoints") + if err := r.EndpointClient.UnloadLoRA(ctx, candidates, model.Spec.ModelName); err != nil { + // Log as Info since we're continuing with deletion anyway (expected behavior) + // Detailed failure information is already logged by the prober + logs.Info("Some endpoints failed to unload LoRA, continuing with deletion", + "error", err.Error()) r.Recorder.Event(model, corev1.EventTypeWarning, "LoRAUnloadFailed", fmt.Sprintf("Failed to unload LoRA from some endpoints: %v", err)) // Continue with deletion even if unload fails @@ -318,15 +308,8 @@ func (r *DynamoModelReconciler) handleDeletion(ctx context.Context, model *v1alp logs.Info("Skipping cleanup for non-LoRA model") } - // Remove finalizer using controllerutil - controllerutil.RemoveFinalizer(model, dynamoModelFinalizer) - if err := r.Update(ctx, model); err != nil { - logs.Error(err, "Failed to remove finalizer") - return ctrl.Result{}, err - } - - logs.Info("Finalizer removed, DynamoModel will be deleted") - return ctrl.Result{}, nil + logs.Info("Finalization completed successfully") + return nil } // getEndpointCandidates fetches EndpointSlices and extracts endpoint candidates diff --git a/deploy/cloud/operator/internal/modelendpoint/prober.go b/deploy/cloud/operator/internal/modelendpoint/client.go similarity index 68% rename from deploy/cloud/operator/internal/modelendpoint/prober.go rename to deploy/cloud/operator/internal/modelendpoint/client.go index eeaa961c92..9a603665af 100644 --- a/deploy/cloud/operator/internal/modelendpoint/prober.go +++ b/deploy/cloud/operator/internal/modelendpoint/client.go @@ -19,6 +19,7 @@ package modelendpoint import ( "context" + "fmt" "net/http" "strings" "time" @@ -30,40 +31,40 @@ import ( ) const ( - // MaxConcurrentProbes is the maximum number of concurrent endpoint probes - MaxConcurrentProbes = 10 - // ProbeTimeout is the timeout for individual HTTP requests - ProbeTimeout = 15 * time.Second - // TotalProbeTimeout is the timeout for all probes to complete - TotalProbeTimeout = 30 * time.Second + // MaxConcurrentOperations is the maximum number of concurrent endpoint operations + MaxConcurrentOperations = 10 + // RequestTimeout is the timeout for individual HTTP requests + RequestTimeout = 15 * time.Second + // TotalTimeout is the timeout for all operations to complete + TotalTimeout = 30 * time.Second ) -// Prober handles HTTP-based endpoint probing -type Prober struct { +// Client handles HTTP communication with model endpoint control APIs +type Client struct { httpClient *http.Client } -// NewProber creates a new endpoint prober -func NewProber() *Prober { - return &Prober{ +// NewClient creates a new model endpoint client +func NewClient() *Client { + return &Client{ httpClient: &http.Client{ - Timeout: ProbeTimeout, + Timeout: RequestTimeout, }, } } -// ProbeEndpoints probes all endpoints in parallel with bounded concurrency -// Returns partial results even if some endpoints fail -func (p *Prober) ProbeEndpoints( +// LoadLoRA loads a LoRA model on all endpoints in parallel with bounded concurrency +// Returns endpoint info with ready status and partial results even if some endpoints fail +func (c *Client) LoadLoRA( ctx context.Context, candidates []Candidate, model *v1alpha1.DynamoModel, ) ([]v1alpha1.EndpointInfo, error) { logs := log.FromContext(ctx) - // Skip probing for non-LoRA models + // Skip loading for non-LoRA models if strings.ToLower(model.Spec.ModelType) != "lora" { - logs.V(1).Info("Skipping probe for non-LoRA model", "modelType", model.Spec.ModelType) + logs.V(1).Info("Skipping LoRA load for non-LoRA model", "modelType", model.Spec.ModelType) endpoints := make([]v1alpha1.EndpointInfo, len(candidates)) for i, c := range candidates { endpoints[i] = v1alpha1.EndpointInfo{ @@ -75,27 +76,37 @@ func (p *Prober) ProbeEndpoints( return endpoints, nil } + // Get source URI for LoRA loading + sourceURI := "" + if model.Spec.Source != nil { + sourceURI = model.Spec.Source.URI + } + if sourceURI == "" { + logs.Error(nil, "Source URI is required for LoRA models") + return nil, fmt.Errorf("source URI is required for LoRA models") + } + // Build tasks for the worker pool tasks := make([]workerpool.Task[v1alpha1.EndpointInfo], len(candidates)) for i, candidate := range candidates { - c := candidate // Capture loop variable tasks[i] = workerpool.Task[v1alpha1.EndpointInfo]{ Index: i, Work: func(ctx context.Context) (v1alpha1.EndpointInfo, error) { - // Probe the endpoint - ready := p.probeLoRAEndpoint(ctx, c.Address, model.Spec.ModelName) + // Load the LoRA on this endpoint (idempotent operation) + err := c.loadLoRA(ctx, candidate.Address, model.Spec.ModelName, sourceURI) + ready := err == nil return v1alpha1.EndpointInfo{ - Address: c.Address, - PodName: c.PodName, + Address: candidate.Address, + PodName: candidate.PodName, Ready: ready, - }, nil + }, err }, } } - // Execute all probes in parallel with bounded concurrency - results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) + // Execute all load operations in parallel with bounded concurrency + results, err := workerpool.Execute(ctx, MaxConcurrentOperations, TotalTimeout, tasks) // Extract endpoint info from results and collect failures endpoints := make([]v1alpha1.EndpointInfo, len(results)) @@ -108,7 +119,7 @@ func (p *Prober) ProbeEndpoints( } else { notReadyEndpoints = append(notReadyEndpoints, result.Value.Address) if result.Err != nil { - logs.Info("Endpoint probe failed", + logs.Info("Endpoint load operation failed", "address", result.Value.Address, "podName", result.Value.PodName, "error", result.Err) @@ -116,7 +127,7 @@ func (p *Prober) ProbeEndpoints( } } - logs.Info("Completed parallel endpoint probing", + logs.Info("Completed parallel LoRA load operations", "total", len(endpoints), "ready", readyCount, "notReady", len(notReadyEndpoints), @@ -126,7 +137,7 @@ func (p *Prober) ProbeEndpoints( } // UnloadLoRA unloads a LoRA model from all endpoints in parallel -func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelName string) error { +func (c *Client) UnloadLoRA(ctx context.Context, candidates []Candidate, modelName string) error { logs := log.FromContext(ctx) if len(candidates) == 0 { @@ -139,12 +150,11 @@ func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelNa // Build tasks for the worker pool tasks := make([]workerpool.Task[bool], len(candidates)) for i, candidate := range candidates { - c := candidate // Capture loop variable tasks[i] = workerpool.Task[bool]{ Index: i, Work: func(ctx context.Context) (bool, error) { // Unload the LoRA from this endpoint (calls method in lora.go) - err := p.unloadLoRA(ctx, c.Address, modelName) + err := c.unloadLoRA(ctx, candidate.Address, modelName) if err != nil { return false, err } @@ -154,7 +164,7 @@ func (p *Prober) UnloadLoRA(ctx context.Context, candidates []Candidate, modelNa } // Execute all unload operations in parallel with bounded concurrency - results, err := workerpool.Execute(ctx, MaxConcurrentProbes, TotalProbeTimeout, tasks) + results, err := workerpool.Execute(ctx, MaxConcurrentOperations, TotalTimeout, tasks) // Collect successes and failures with details successCount := 0 diff --git a/deploy/cloud/operator/internal/modelendpoint/lora.go b/deploy/cloud/operator/internal/modelendpoint/lora.go index 5267e05d17..aeef45ef65 100644 --- a/deploy/cloud/operator/internal/modelendpoint/lora.go +++ b/deploy/cloud/operator/internal/modelendpoint/lora.go @@ -28,13 +28,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -// LoadLoRA loads a LoRA model on the specified endpoint -func (p *Prober) LoadLoRA(ctx context.Context, address, modelName string) error { +// loadLoRA loads a LoRA model on a single endpoint +func (c *Client) loadLoRA(ctx context.Context, address, modelName, sourceURI string) error { logs := log.FromContext(ctx) + // Build request body with source object loadReq := map[string]interface{}{ "lora_name": modelName, - "lora_path": modelName, // May need to adjust based on actual API + "source": map[string]interface{}{ + "uri": sourceURI, + }, } loadBody, err := json.Marshal(loadReq) @@ -48,7 +51,7 @@ func (p *Prober) LoadLoRA(ctx context.Context, address, modelName string) error } req.Header.Set("Content-Type", "application/json") - resp, err := p.httpClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to call load LoRA endpoint: %w", err) } @@ -60,92 +63,12 @@ func (p *Prober) LoadLoRA(ctx context.Context, address, modelName string) error return fmt.Errorf("load LoRA failed with status %d: %s", resp.StatusCode, string(body)) } - logs.Info("Successfully loaded LoRA", "address", address, "modelName", modelName) + logs.Info("Successfully loaded LoRA", "address", address, "modelName", modelName, "sourceURI", sourceURI) return nil } -// VerifyLoRALoaded checks if a LoRA model is present in the GET /loras response -func (p *Prober) VerifyLoRALoaded(ctx context.Context, address, modelName string) bool { - logs := log.FromContext(ctx) - - req, err := http.NewRequestWithContext(ctx, "GET", address+"/v1/loras", nil) - if err != nil { - logs.V(1).Info("Failed to create GET loras request", "error", err) - return false - } - - resp, err := p.httpClient.Do(req) - if err != nil { - logs.V(1).Info("Failed to call GET loras endpoint", "address", address, "error", err) - return false - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - logs.V(1).Info("GET loras failed", "address", address, "status", resp.StatusCode) - return false - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - logs.V(1).Info("Failed to read loras response", "error", err) - return false - } - - // Parse the response - expecting a JSON array or object with lora names - var lorasResp struct { - Loras []string `json:"loras"` - } - - if err := json.Unmarshal(body, &lorasResp); err != nil { - // Try parsing as a simple array - var lorasList []string - if err := json.Unmarshal(body, &lorasList); err != nil { - logs.V(1).Info("Failed to parse loras response", "error", err, "body", string(body)) - return false - } - lorasResp.Loras = lorasList - } - - // Check if our model is in the list - for _, lora := range lorasResp.Loras { - if lora == modelName { - logs.V(1).Info("LoRA model verified as loaded", "address", address, "modelName", modelName) - return true - } - } - - logs.V(1).Info("LoRA model not found in list", "address", address, "modelName", modelName, "availableLoras", lorasResp.Loras) - return false -} - -// probeLoRAEndpoint checks if LoRA is already loaded, and loads it if not -func (p *Prober) probeLoRAEndpoint(ctx context.Context, address, modelName string) bool { - logs := log.FromContext(ctx) - - // Step 1: Check if LoRA is already loaded - if p.VerifyLoRALoaded(ctx, address, modelName) { - logs.V(1).Info("LoRA already loaded", "address", address, "modelName", modelName) - return true - } - - // Step 2: Load the LoRA since it's not present - if err := p.LoadLoRA(ctx, address, modelName); err != nil { - logs.V(1).Info("Failed to load LoRA", "address", address, "error", err) - return false - } - - // Step 3: Verify the LoRA was loaded successfully - if p.VerifyLoRALoaded(ctx, address, modelName) { - return true - } - - logs.V(1).Info("LoRA load appeared successful but verification failed", "address", address) - return false -} - // unloadLoRA unloads a LoRA model from a single endpoint -func (p *Prober) unloadLoRA(ctx context.Context, address, modelName string) error { +func (c *Client) unloadLoRA(ctx context.Context, address, modelName string) error { logs := log.FromContext(ctx) req, err := http.NewRequestWithContext(ctx, "DELETE", address+"/v1/loras/"+modelName, nil) @@ -154,7 +77,7 @@ func (p *Prober) unloadLoRA(ctx context.Context, address, modelName string) erro return fmt.Errorf("failed to create unload LoRA request: %w", err) } - resp, err := p.httpClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { logs.V(1).Info("Failed to call unload LoRA endpoint", "address", address, "error", err) return fmt.Errorf("failed to call unload LoRA endpoint: %w", err) diff --git a/deploy/cloud/operator/internal/modelendpoint/types.go b/deploy/cloud/operator/internal/modelendpoint/types.go index 879e2c578c..a69ffd93d9 100644 --- a/deploy/cloud/operator/internal/modelendpoint/types.go +++ b/deploy/cloud/operator/internal/modelendpoint/types.go @@ -17,19 +17,8 @@ package modelendpoint -import ( - "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" -) - -// Candidate represents an endpoint candidate to be probed +// Candidate represents an endpoint candidate for operations type Candidate struct { Address string PodName string } - -// ProbeResult represents the result of probing a single endpoint -type ProbeResult struct { - Index int - Endpoint v1alpha1.EndpointInfo - Err error -} From a9f6f006fc11a49794155ba817ed0d79c7809bf5 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 15:00:16 -0700 Subject: [PATCH 4/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../cloud/operator/internal/consts/consts.go | 2 + .../controller/dynamo_model_controller.go | 11 ++-- .../dynamocomponentdeployment_controller.go | 4 ++ .../internal/dynamo/headless_service.go | 52 +++++++++++++++---- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/deploy/cloud/operator/internal/consts/consts.go b/deploy/cloud/operator/internal/consts/consts.go index ca1c01b64d..2776df37d0 100644 --- a/deploy/cloud/operator/internal/consts/consts.go +++ b/deploy/cloud/operator/internal/consts/consts.go @@ -39,6 +39,8 @@ const ( KubeLabelDynamoComponentType = "nvidia.com/dynamo-component-type" KubeLabelDynamoSubComponentType = "nvidia.com/dynamo-sub-component-type" KubeLabelDynamoBaseModel = "nvidia.com/dynamo-base-model" + KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash" + KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeLabelValueFalse = "false" KubeLabelValueTrue = "true" diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go index 350d6cf455..bf54eb4ba8 100644 --- a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -40,6 +40,7 @@ import ( "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" commoncontroller "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" + "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/modelendpoint" ) @@ -320,12 +321,16 @@ func (r *DynamoModelReconciler) getEndpointCandidates( ) ([]modelendpoint.Candidate, map[string]bool, error) { logs := log.FromContext(ctx) - // Get EndpointSlices for the headless service - // Service name = base model name, so we can directly query by service label + // Generate the predictable service name from the base model name + // Service name is deterministic: dynamo-model-{8-char-hash} + serviceName := dynamo.GenerateServiceName(model.Spec.BaseModelName) + + // Query EndpointSlices directly by service name + // EndpointSlices are automatically labeled with kubernetes.io/service-name endpointSlices := &discoveryv1.EndpointSliceList{} if err := r.List(ctx, endpointSlices, client.InNamespace(model.Namespace), - client.MatchingLabels{discoveryv1.LabelServiceName: model.Spec.BaseModelName}, + client.MatchingLabels{discoveryv1.LabelServiceName: serviceName}, ); err != nil { logs.Error(err, "Failed to list endpoint slices for model") r.Recorder.Event(model, corev1.EventTypeWarning, "EndpointDiscoveryFailed", err.Error()) diff --git a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go index 1d772903df..5135d56363 100644 --- a/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamocomponentdeployment_controller.go @@ -965,6 +965,10 @@ func (r *DynamoComponentDeploymentReconciler) getKubeAnnotations(dynamoComponent for k, v := range extraAnnotations { annotations[k] = v } + // Add base model annotation if modelRef is specified (for human readability) + if dynamoComponentDeployment != nil { + dynamo.AddBaseModelAnnotation(annotations, dynamoComponentDeployment.Spec.ModelRef) + } return annotations } diff --git a/deploy/cloud/operator/internal/dynamo/headless_service.go b/deploy/cloud/operator/internal/dynamo/headless_service.go index 0b685314c6..05c9b8cd6b 100644 --- a/deploy/cloud/operator/internal/dynamo/headless_service.go +++ b/deploy/cloud/operator/internal/dynamo/headless_service.go @@ -19,6 +19,8 @@ package dynamo import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" @@ -63,10 +65,9 @@ func ReconcileModelServicesForComponents( } seenBaseModels[baseModelName] = true - // Generate headless service + // Generate headless service with deterministic name based on model name headlessService := GenerateHeadlessServiceForModel( ctx, - baseModelName, // Use base model name as service name namespace, baseModelName, ) @@ -97,28 +98,39 @@ func ReconcileModelServicesForComponents( } // GenerateHeadlessServiceForModel creates a headless service for model endpoint discovery +// Service name is generated deterministically from the base model name using a hash +// The base model name hash is stored as a label for efficient discovery +// The original base model name is stored in an annotation for human readability func GenerateHeadlessServiceForModel( ctx context.Context, - serviceName string, namespace string, baseModelName string, ) *corev1.Service { + // Generate deterministic service name from model name + serviceName := GenerateServiceName(baseModelName) + + // Hash the base model name for use in labels (no length or character restrictions) + modelHash := HashModelName(baseModelName) + service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: namespace, Labels: map[string]string{ - commonconsts.KubeLabelDynamoBaseModel: baseModelName, - "nvidia.com/managed-by": "dynamo-operator", + commonconsts.KubeLabelDynamoBaseModelHash: modelHash, + "nvidia.com/managed-by": "dynamo-operator", + }, + Annotations: map[string]string{ + commonconsts.KubeAnnotationDynamoBaseModel: baseModelName, // Original name for humans }, }, Spec: corev1.ServiceSpec{ // Headless service - no ClusterIP, no load balancing ClusterIP: corev1.ClusterIPNone, - // Selector to match pods with the base model label + // Selector to match pods with the base model hash label Selector: map[string]string{ - commonconsts.KubeLabelDynamoBaseModel: baseModelName, + commonconsts.KubeLabelDynamoBaseModelHash: modelHash, }, // Don't publish not-ready addresses - only ready pods in EndpointSlices @@ -139,9 +151,31 @@ func GenerateHeadlessServiceForModel( return service } -// AddBaseModelLabel adds the base model label to a label map if modelRef is present +// HashModelName creates a deterministic hash from a base model name for use in labels +// Returns an 8-character hex string (always valid as a Kubernetes label value) +func HashModelName(baseModelName string) string { + hash := sha256.Sum256([]byte(baseModelName)) + // Use 8 characters for brevity and consistency + return hex.EncodeToString(hash[:])[:8] +} + +// GenerateServiceName creates a deterministic, DNS-safe service name from a base model name +// Format: dynamo-model-{8-char-hash} +func GenerateServiceName(baseModelName string) string { + return fmt.Sprintf("dynamo-model-%s", HashModelName(baseModelName)) +} + +// AddBaseModelLabel adds the base model hash label to a label map if modelRef is present +// Uses a hash of the model name to avoid label length/character restrictions func AddBaseModelLabel(labels map[string]string, modelRef *v1alpha1.ModelReference) { if modelRef != nil && modelRef.Name != "" { - labels[commonconsts.KubeLabelDynamoBaseModel] = modelRef.Name + labels[commonconsts.KubeLabelDynamoBaseModelHash] = HashModelName(modelRef.Name) + } +} + +// AddBaseModelAnnotation adds the base model annotation to preserve the original model name +func AddBaseModelAnnotation(annotations map[string]string, modelRef *v1alpha1.ModelReference) { + if modelRef != nil && modelRef.Name != "" { + annotations[commonconsts.KubeAnnotationDynamoBaseModel] = modelRef.Name } } From 1fc50d65db0b1a9774ebb0db01f51b1915bdb208 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 15:10:14 -0700 Subject: [PATCH 5/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../internal/controller/dynamo_model_controller.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go index bf54eb4ba8..1f3c149c20 100644 --- a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -321,16 +321,15 @@ func (r *DynamoModelReconciler) getEndpointCandidates( ) ([]modelendpoint.Candidate, map[string]bool, error) { logs := log.FromContext(ctx) - // Generate the predictable service name from the base model name - // Service name is deterministic: dynamo-model-{8-char-hash} - serviceName := dynamo.GenerateServiceName(model.Spec.BaseModelName) + // Hash the base model name for label-based discovery + modelHash := dynamo.HashModelName(model.Spec.BaseModelName) - // Query EndpointSlices directly by service name - // EndpointSlices are automatically labeled with kubernetes.io/service-name + // Query EndpointSlices directly by base model hash label + // This label propagates from the Service to its EndpointSlices endpointSlices := &discoveryv1.EndpointSliceList{} if err := r.List(ctx, endpointSlices, client.InNamespace(model.Namespace), - client.MatchingLabels{discoveryv1.LabelServiceName: serviceName}, + client.MatchingLabels{consts.KubeLabelDynamoBaseModelHash: modelHash}, ); err != nil { logs.Error(err, "Failed to list endpoint slices for model") r.Recorder.Event(model, corev1.EventTypeWarning, "EndpointDiscoveryFailed", err.Error()) From 845867bf6031978aefb51c220bc713a4961113e1 Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 15:15:12 -0700 Subject: [PATCH 6/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../templates/nvidia.com_dynamomodels.yaml | 16 ++- .../crd/bases/nvidia.com_dynamomodels.yaml | 16 ++- docs/kubernetes/api_reference.md | 112 ++++++++++++++++++ 3 files changed, 138 insertions(+), 6 deletions(-) diff --git a/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml b/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml index 473c20a004..5bed58928f 100644 --- a/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml +++ b/deploy/cloud/helm/crds/templates/nvidia.com_dynamomodels.yaml @@ -82,9 +82,6 @@ spec: BaseModelName is the base model identifier that matches the service label This is used to discover endpoints via headless services type: string - loraPath: - description: LoraPath is the path to the LoRA adapter (only applicable for lora model type) - type: string modelName: description: ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") type: string @@ -96,6 +93,19 @@ spec: - lora - adapter type: string + source: + description: Source specifies the model source location (only applicable for lora model type) + properties: + uri: + description: |- + URI is the model source URI + Supported formats: + - S3: s3://bucket/path/to/model + - HuggingFace: hf://org/model@revision_sha + type: string + required: + - uri + type: object required: - baseModelName - modelName diff --git a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml index 473c20a004..5bed58928f 100644 --- a/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml +++ b/deploy/cloud/operator/config/crd/bases/nvidia.com_dynamomodels.yaml @@ -82,9 +82,6 @@ spec: BaseModelName is the base model identifier that matches the service label This is used to discover endpoints via headless services type: string - loraPath: - description: LoraPath is the path to the LoRA adapter (only applicable for lora model type) - type: string modelName: description: ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") type: string @@ -96,6 +93,19 @@ spec: - lora - adapter type: string + source: + description: Source specifies the model source location (only applicable for lora model type) + properties: + uri: + description: |- + URI is the model source URI + Supported formats: + - S3: s3://bucket/path/to/model + - HuggingFace: hf://org/model@revision_sha + type: string + required: + - uri + type: object required: - baseModelName - modelName diff --git a/docs/kubernetes/api_reference.md b/docs/kubernetes/api_reference.md index 00ac44dd93..83eb6650e2 100644 --- a/docs/kubernetes/api_reference.md +++ b/docs/kubernetes/api_reference.md @@ -37,6 +37,7 @@ Package v1alpha1 contains API Schema definitions for the nvidia.com v1alpha1 API - [DynamoComponentDeployment](#dynamocomponentdeployment) - [DynamoGraphDeployment](#dynamographdeployment) - [DynamoGraphDeploymentRequest](#dynamographdeploymentrequest) +- [DynamoModel](#dynamomodel) @@ -166,6 +167,7 @@ _Appears in:_ | `envFromSecret` _string_ | EnvFromSecret references a Secret whose key/value pairs will be exposed as
environment variables in the component containers. | | | | `volumeMounts` _[VolumeMount](#volumemount) array_ | VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. | | | | `ingress` _[IngressSpec](#ingressspec)_ | Ingress config to expose the component outside the cluster (or through a service mesh). | | | +| `modelRef` _[ModelReference](#modelreference)_ | ModelRef references a model that this component serves
When specified, a headless service will be created for endpoint discovery | | | | `sharedMemory` _[SharedMemorySpec](#sharedmemoryspec)_ | SharedMemory controls the tmpfs mounted at /dev/shm (enable/disable and size). | | | | `extraPodMetadata` _[ExtraPodMetadata](#extrapodmetadata)_ | ExtraPodMetadata adds labels/annotations to the created Pods. | | | | `extraPodSpec` _[ExtraPodSpec](#extrapodspec)_ | ExtraPodSpec allows to override the main pod spec configuration.
It is a k8s standard PodSpec. It also contains a MainContainer (standard k8s Container) field
that allows overriding the main container configuration. | | | @@ -201,6 +203,7 @@ _Appears in:_ | `envFromSecret` _string_ | EnvFromSecret references a Secret whose key/value pairs will be exposed as
environment variables in the component containers. | | | | `volumeMounts` _[VolumeMount](#volumemount) array_ | VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. | | | | `ingress` _[IngressSpec](#ingressspec)_ | Ingress config to expose the component outside the cluster (or through a service mesh). | | | +| `modelRef` _[ModelReference](#modelreference)_ | ModelRef references a model that this component serves
When specified, a headless service will be created for endpoint discovery | | | | `sharedMemory` _[SharedMemorySpec](#sharedmemoryspec)_ | SharedMemory controls the tmpfs mounted at /dev/shm (enable/disable and size). | | | | `extraPodMetadata` _[ExtraPodMetadata](#extrapodmetadata)_ | ExtraPodMetadata adds labels/annotations to the created Pods. | | | | `extraPodSpec` _[ExtraPodSpec](#extrapodspec)_ | ExtraPodSpec allows to override the main pod spec configuration.
It is a k8s standard PodSpec. It also contains a MainContainer (standard k8s Container) field
that allows overriding the main container configuration. | | | @@ -345,6 +348,81 @@ _Appears in:_ | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions contains the latest observed conditions of the graph deployment.
The slice is merged by type on patch updates. | | | +#### DynamoModel + + + +DynamoModel is the Schema for the dynamo models API + + + + + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `apiVersion` _string_ | `nvidia.com/v1alpha1` | | | +| `kind` _string_ | `DynamoModel` | | | +| `metadata` _[ObjectMeta](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#objectmeta-v1-meta)_ | Refer to Kubernetes API documentation for fields of `metadata`. | | | +| `spec` _[DynamoModelSpec](#dynamomodelspec)_ | | | | +| `status` _[DynamoModelStatus](#dynamomodelstatus)_ | | | | + + +#### DynamoModelSpec + + + +DynamoModelSpec defines the desired state of DynamoModel + + + +_Appears in:_ +- [DynamoModel](#dynamomodel) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `modelName` _string_ | ModelName is the full model identifier (e.g., "meta-llama/Llama-3.3-70B-Instruct-lora") | | Required: {}
| +| `baseModelName` _string_ | BaseModelName is the base model identifier that matches the service label
This is used to discover endpoints via headless services | | Required: {}
| +| `modelType` _string_ | ModelType specifies the type of model (e.g., "base", "lora", "adapter") | base | Enum: [base lora adapter]
| +| `source` _[ModelSource](#modelsource)_ | Source specifies the model source location (only applicable for lora model type) | | | + + +#### DynamoModelStatus + + + +DynamoModelStatus defines the observed state of DynamoModel + + + +_Appears in:_ +- [DynamoModel](#dynamomodel) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `endpoints` _[EndpointInfo](#endpointinfo) array_ | Endpoints is the current list of all endpoints for this model | | | +| `readyEndpoints` _integer_ | ReadyEndpoints is the count of endpoints that are ready | | | +| `totalEndpoints` _integer_ | TotalEndpoints is the total count of endpoints | | | +| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions represents the latest available observations of the model's state | | | + + +#### EndpointInfo + + + +EndpointInfo represents a single endpoint (pod) serving the model + + + +_Appears in:_ +- [DynamoModelStatus](#dynamomodelstatus) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `address` _string_ | Address is the full address of the endpoint (e.g., "http://10.0.1.5:9090") | | | +| `podName` _string_ | PodName is the name of the pod serving this endpoint | | | +| `ready` _boolean_ | Ready indicates whether the endpoint is ready to serve traffic
For LoRA models: true if the LoRA was successfully loaded and appears in GET /v1/loras
For base models: always false (no probing performed) | | | + + #### IngressSpec @@ -387,6 +465,40 @@ _Appears in:_ | `secretName` _string_ | SecretName is the name of a Kubernetes Secret containing the TLS certificate and key. | | | +#### ModelReference + + + +ModelReference identifies a model served by this component + + + +_Appears in:_ +- [DynamoComponentDeploymentSharedSpec](#dynamocomponentdeploymentsharedspec) +- [DynamoComponentDeploymentSpec](#dynamocomponentdeploymentspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `name` _string_ | Name is the base model identifier (e.g., "llama-3-70b-instruct-v1") | | Required: {}
| +| `revision` _string_ | Revision is the model revision/version (optional) | | | + + +#### ModelSource + + + +ModelSource defines the source location of a model + + + +_Appears in:_ +- [DynamoModelSpec](#dynamomodelspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `uri` _string_ | URI is the model source URI
Supported formats:
- S3: s3://bucket/path/to/model
- HuggingFace: hf://org/model@revision_sha | | Required: {}
| + + #### MultinodeSpec From 49991d6b84fae6e4b5cedee427da99f7e4bdbadc Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 15:17:54 -0700 Subject: [PATCH 7/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../controller/dynamo_model_controller.go | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go index 1f3c149c20..35fe65a674 100644 --- a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -20,6 +20,7 @@ package controller import ( "context" "fmt" + "strings" "time" corev1 "k8s.io/api/core/v1" @@ -130,7 +131,15 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Load LoRA on all endpoints in parallel with bounded concurrency allEndpoints, probeErr := r.EndpointClient.LoadLoRA(ctx, candidates, model) - hasFailures := probeErr != nil || countReadyEndpoints(allEndpoints) < len(allEndpoints) + + // Determine if we need to requeue based on model type + // For LoRA models: requeue if there were probe errors OR if not all endpoints are ready + // For base models: only requeue if there were probe errors (Ready is expected to be false) + isLoRA := strings.ToLower(model.Spec.ModelType) == "lora" + hasFailures := probeErr != nil + if isLoRA { + hasFailures = hasFailures || countReadyEndpoints(allEndpoints) < len(allEndpoints) + } if probeErr != nil { logs.Error(probeErr, "Some endpoints failed during probing") @@ -152,18 +161,31 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) model.Status.TotalEndpoints = len(allEndpoints) model.Status.ReadyEndpoints = countReadyEndpoints(allEndpoints) - // Update conditions - if model.Status.ReadyEndpoints > 0 { - r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionTrue, ReasonEndpointsDiscovered, - fmt.Sprintf("Found %d ready endpoint(s) out of %d total", model.Status.ReadyEndpoints, model.Status.TotalEndpoints)) - r.Recorder.Eventf(model, corev1.EventTypeNormal, "EndpointsReady", - "Discovered %d ready endpoints for base model %s", model.Status.ReadyEndpoints, model.Spec.BaseModelName) - } else if model.Status.TotalEndpoints > 0 { - r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoReadyEndpoints, - fmt.Sprintf("Found %d endpoint(s) but none are ready", model.Status.TotalEndpoints)) - r.Recorder.Event(model, corev1.EventTypeWarning, "NoReadyEndpoints", "Endpoints exist but none are ready") + // Update conditions based on model type + if isLoRA { + // For LoRA models, check readiness + if model.Status.ReadyEndpoints > 0 { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionTrue, ReasonEndpointsDiscovered, + fmt.Sprintf("Found %d ready endpoint(s) out of %d total", model.Status.ReadyEndpoints, model.Status.TotalEndpoints)) + r.Recorder.Eventf(model, corev1.EventTypeNormal, "EndpointsReady", + "Discovered %d ready endpoints for base model %s", model.Status.ReadyEndpoints, model.Spec.BaseModelName) + } else if model.Status.TotalEndpoints > 0 { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoReadyEndpoints, + fmt.Sprintf("Found %d endpoint(s) but none are ready", model.Status.TotalEndpoints)) + r.Recorder.Event(model, corev1.EventTypeWarning, "NoReadyEndpoints", "Endpoints exist but none are ready") + } else { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoEndpoints, "No endpoints found") + } } else { - r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoEndpoints, "No endpoints found") + // For base models, just check that endpoints exist (readiness doesn't apply) + if model.Status.TotalEndpoints > 0 { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionTrue, ReasonEndpointsDiscovered, + fmt.Sprintf("Found %d endpoint(s) for base model", model.Status.TotalEndpoints)) + r.Recorder.Eventf(model, corev1.EventTypeNormal, "EndpointsDiscovered", + "Discovered %d endpoints for base model %s", model.Status.TotalEndpoints, model.Spec.BaseModelName) + } else { + r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionFalse, ReasonNoEndpoints, "No endpoints found") + } } if err := r.Status().Update(ctx, model); err != nil { From cacf4d45279bc2457e2bcd8e2ed8d15c84c0d87a Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 15:33:53 -0700 Subject: [PATCH 8/9] fix: add dynamoModel CRD Signed-off-by: Julien Mancuso --- .../internal/modelendpoint/discovery.go | 5 +- .../operator/internal/modelendpoint/lora.go | 30 +++++++++-- .../operator/internal/workerpool/pool.go | 54 +++++++++++-------- 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/deploy/cloud/operator/internal/modelendpoint/discovery.go b/deploy/cloud/operator/internal/modelendpoint/discovery.go index cd404c8eac..ef8bd40a87 100644 --- a/deploy/cloud/operator/internal/modelendpoint/discovery.go +++ b/deploy/cloud/operator/internal/modelendpoint/discovery.go @@ -19,7 +19,8 @@ package modelendpoint import ( "context" - "fmt" + "net" + "strconv" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" @@ -60,7 +61,7 @@ func ExtractCandidates(endpointSlices *discoveryv1.EndpointSliceList, port int32 } for _, addr := range ep.Addresses { - address := fmt.Sprintf("http://%s:%d", addr, port) + address := "http://" + net.JoinHostPort(addr, strconv.Itoa(int(port))) candidates = append(candidates, Candidate{ Address: address, PodName: podName, diff --git a/deploy/cloud/operator/internal/modelendpoint/lora.go b/deploy/cloud/operator/internal/modelendpoint/lora.go index aeef45ef65..70b50f41a2 100644 --- a/deploy/cloud/operator/internal/modelendpoint/lora.go +++ b/deploy/cloud/operator/internal/modelendpoint/lora.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "net/http" + "net/url" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -45,7 +46,13 @@ func (c *Client) loadLoRA(ctx context.Context, address, modelName, sourceURI str return fmt.Errorf("failed to marshal load LoRA request: %w", err) } - req, err := http.NewRequestWithContext(ctx, "POST", address+"/v1/load_lora", bytes.NewBuffer(loadBody)) + // Build URL robustly using url.JoinPath to handle trailing slashes + apiURL, err := url.JoinPath(address, "/loras") + if err != nil { + return fmt.Errorf("failed to construct load LoRA URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(loadBody)) if err != nil { return fmt.Errorf("failed to create load LoRA request: %w", err) } @@ -55,7 +62,11 @@ func (c *Client) loadLoRA(ctx context.Context, address, modelName, sourceURI str if err != nil { return fmt.Errorf("failed to call load LoRA endpoint: %w", err) } - defer resp.Body.Close() + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + logs.V(1).Info("Failed to close response body", "error", closeErr) + } + }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) @@ -71,7 +82,14 @@ func (c *Client) loadLoRA(ctx context.Context, address, modelName, sourceURI str func (c *Client) unloadLoRA(ctx context.Context, address, modelName string) error { logs := log.FromContext(ctx) - req, err := http.NewRequestWithContext(ctx, "DELETE", address+"/v1/loras/"+modelName, nil) + // Build URL robustly using url.JoinPath to handle trailing slashes and encode modelName + apiURL, err := url.JoinPath(address, "/loras", modelName) + if err != nil { + logs.V(1).Info("Failed to construct unload LoRA URL", "error", err) + return fmt.Errorf("failed to construct unload LoRA URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "DELETE", apiURL, nil) if err != nil { logs.V(1).Info("Failed to create unload LoRA request", "error", err) return fmt.Errorf("failed to create unload LoRA request: %w", err) @@ -82,7 +100,11 @@ func (c *Client) unloadLoRA(ctx context.Context, address, modelName string) erro logs.V(1).Info("Failed to call unload LoRA endpoint", "address", address, "error", err) return fmt.Errorf("failed to call unload LoRA endpoint: %w", err) } - defer resp.Body.Close() + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + logs.V(1).Info("Failed to close response body", "error", closeErr) + } + }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) diff --git a/deploy/cloud/operator/internal/workerpool/pool.go b/deploy/cloud/operator/internal/workerpool/pool.go index c49abf3237..d279615215 100644 --- a/deploy/cloud/operator/internal/workerpool/pool.go +++ b/deploy/cloud/operator/internal/workerpool/pool.go @@ -37,10 +37,16 @@ type Result[T any] struct { Err error } -// Execute runs all tasks in parallel with bounded concurrency +// Execute runs all tasks in parallel with bounded concurrency using a worker pool // Returns results in the same order as input tasks, even if execution order differs // Continues executing all tasks even if some fail +// Spawns exactly maxWorkers goroutines regardless of task count func Execute[T any](ctx context.Context, maxWorkers int, timeout time.Duration, tasks []Task[T]) ([]Result[T], error) { + // Validate maxWorkers to prevent panics or hangs + if maxWorkers < 1 { + return nil, fmt.Errorf("maxWorkers must be at least 1, got %d", maxWorkers) + } + if len(tasks) == 0 { return nil, nil } @@ -49,34 +55,40 @@ func Execute[T any](ctx context.Context, maxWorkers int, timeout time.Duration, execCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Create buffered channels + // Create channels + taskChan := make(chan Task[T]) results := make(chan Result[T], len(tasks)) - semaphore := make(chan struct{}, maxWorkers) - // Launch all task goroutines + // Start exactly maxWorkers worker goroutines var wg sync.WaitGroup - for _, task := range tasks { + for range maxWorkers { wg.Add(1) - go func(t Task[T]) { + go func() { defer wg.Done() - - // Acquire semaphore slot (bounded concurrency) - semaphore <- struct{}{} - defer func() { <-semaphore }() - - // Execute the task - value, err := t.Work(execCtx) - - // Send result through channel - results <- Result[T]{ - Index: t.Index, - Value: value, - Err: err, + // Each worker pulls tasks from the channel until it's closed + for task := range taskChan { + // Execute the task + value, err := task.Work(execCtx) + + // Send result through channel + results <- Result[T]{ + Index: task.Index, + Value: value, + Err: err, + } } - }(task) + }() } - // Close results channel when all goroutines complete + // Feed tasks to workers in a separate goroutine to avoid blocking + go func() { + for _, task := range tasks { + taskChan <- task + } + close(taskChan) // Signal workers that no more tasks are coming + }() + + // Close results channel when all workers complete go func() { wg.Wait() close(results) From 2b9d6c2b8a795729215ba7482ca62d9fbbeb1aaf Mon Sep 17 00:00:00 2001 From: Julien Mancuso Date: Thu, 6 Nov 2025 16:42:48 -0700 Subject: [PATCH 9/9] fix: taking Anna's comments into account Signed-off-by: Julien Mancuso --- .../api/v1alpha1/dynamo_model_types.go | 7 +++++ .../controller/dynamo_model_controller.go | 27 +++++++------------ .../{headless_service.go => model_service.go} | 22 +++++++-------- .../operator/internal/modelendpoint/client.go | 3 +-- 4 files changed, 28 insertions(+), 31 deletions(-) rename deploy/cloud/operator/internal/dynamo/{headless_service.go => model_service.go} (90%) diff --git a/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go index 0c06cf5e60..5a3a5b2a86 100644 --- a/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go +++ b/deploy/cloud/operator/api/v1alpha1/dynamo_model_types.go @@ -18,6 +18,8 @@ package v1alpha1 import ( + "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -117,6 +119,11 @@ func init() { SchemeBuilder.Register(&DynamoModel{}, &DynamoModelList{}) } +// IsLoRA returns true if this is a LoRA model (case-insensitive) +func (m *DynamoModel) IsLoRA() bool { + return strings.EqualFold(m.Spec.ModelType, "lora") +} + // GetReadyEndpoints returns only the endpoints that are ready func (m *DynamoModel) GetReadyEndpoints() []EndpointInfo { var ready []EndpointInfo diff --git a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go index 35fe65a674..c9f8bf1d19 100644 --- a/deploy/cloud/operator/internal/controller/dynamo_model_controller.go +++ b/deploy/cloud/operator/internal/controller/dynamo_model_controller.go @@ -20,7 +20,6 @@ package controller import ( "context" "fmt" - "strings" "time" corev1 "k8s.io/api/core/v1" @@ -59,6 +58,9 @@ const ( // Field index names dynamoModelBaseModelIndex = ".spec.baseModelName" + + // Requeue duration for retries when endpoints are not ready + requeueAfterDuration = 30 * time.Second ) // DynamoModelReconciler reconciles a DynamoModel object @@ -106,7 +108,7 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) candidates, serviceNames, err := r.getEndpointCandidates(ctx, model) if err != nil { // Error already logged and status updated in helper - return ctrl.Result{RequeueAfter: 30 * time.Second}, err + return ctrl.Result{RequeueAfter: requeueAfterDuration}, err } if len(candidates) == 0 { @@ -121,12 +123,7 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Status().Update(ctx, model); err != nil { return ctrl.Result{}, err } - return ctrl.Result{RequeueAfter: 30 * time.Second}, nil - } - - // Initialize endpoint client if needed - if r.EndpointClient == nil { - r.EndpointClient = modelendpoint.NewClient() + return ctrl.Result{RequeueAfter: requeueAfterDuration}, nil } // Load LoRA on all endpoints in parallel with bounded concurrency @@ -135,9 +132,8 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Determine if we need to requeue based on model type // For LoRA models: requeue if there were probe errors OR if not all endpoints are ready // For base models: only requeue if there were probe errors (Ready is expected to be false) - isLoRA := strings.ToLower(model.Spec.ModelType) == "lora" hasFailures := probeErr != nil - if isLoRA { + if model.IsLoRA() { hasFailures = hasFailures || countReadyEndpoints(allEndpoints) < len(allEndpoints) } @@ -162,7 +158,7 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) model.Status.ReadyEndpoints = countReadyEndpoints(allEndpoints) // Update conditions based on model type - if isLoRA { + if model.IsLoRA() { // For LoRA models, check readiness if model.Status.ReadyEndpoints > 0 { r.updateCondition(model, ConditionTypeEndpointsReady, metav1.ConditionTrue, ReasonEndpointsDiscovered, @@ -202,7 +198,7 @@ func (r *DynamoModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) logs.Info("Requeuing due to endpoint probe failures", "ready", model.Status.ReadyEndpoints, "total", model.Status.TotalEndpoints) - return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + return ctrl.Result{RequeueAfter: requeueAfterDuration}, nil } return ctrl.Result{}, nil @@ -294,7 +290,7 @@ func (r *DynamoModelReconciler) FinalizeResource(ctx context.Context, model *v1a logs.Info("Finalizing DynamoModel", "modelType", model.Spec.ModelType) // Only perform cleanup for LoRA models - if model.Spec.ModelType == "lora" { + if model.IsLoRA() { // Get endpoint candidates (reusing common logic) candidates, _, err := r.getEndpointCandidates(ctx, model) if err != nil { @@ -305,11 +301,6 @@ func (r *DynamoModelReconciler) FinalizeResource(ctx context.Context, model *v1a } else if len(candidates) > 0 { logs.Info("Unloading LoRA from endpoints", "endpointCount", len(candidates)) - // Initialize endpoint client if needed - if r.EndpointClient == nil { - r.EndpointClient = modelendpoint.NewClient() - } - // Unload LoRA from all endpoints in parallel if err := r.EndpointClient.UnloadLoRA(ctx, candidates, model.Spec.ModelName); err != nil { // Log as Info since we're continuing with deletion anyway (expected behavior) diff --git a/deploy/cloud/operator/internal/dynamo/headless_service.go b/deploy/cloud/operator/internal/dynamo/model_service.go similarity index 90% rename from deploy/cloud/operator/internal/dynamo/headless_service.go rename to deploy/cloud/operator/internal/dynamo/model_service.go index 05c9b8cd6b..5857b41ad4 100644 --- a/deploy/cloud/operator/internal/dynamo/headless_service.go +++ b/deploy/cloud/operator/internal/dynamo/model_service.go @@ -33,14 +33,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -// ReconcileModelServicesForComponents creates headless services for components with modelRef +// ReconcileModelServicesForComponents creates services for components with modelRef // This is common logic used by both DynamoGraphDeployment and DynamoComponentDeployment controllers // reconciler must implement controller_common.Reconciler interface func ReconcileModelServicesForComponents( ctx context.Context, reconciler commonController.Reconciler, owner client.Object, - services map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec, + components map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec, namespace string, ) error { logger := log.FromContext(ctx) @@ -48,7 +48,7 @@ func ReconcileModelServicesForComponents( // Track unique base models to avoid creating duplicate services seenBaseModels := make(map[string]bool) - for componentName, component := range services { + for componentName, component := range components { // Skip if no modelRef if component.ModelRef == nil || component.ModelRef.Name == "" { continue @@ -66,8 +66,7 @@ func ReconcileModelServicesForComponents( seenBaseModels[baseModelName] = true // Generate headless service with deterministic name based on model name - headlessService := GenerateHeadlessServiceForModel( - ctx, + headlessService := generateHeadlessServiceForModel( namespace, baseModelName, ) @@ -101,8 +100,7 @@ func ReconcileModelServicesForComponents( // Service name is generated deterministically from the base model name using a hash // The base model name hash is stored as a label for efficient discovery // The original base model name is stored in an annotation for human readability -func GenerateHeadlessServiceForModel( - ctx context.Context, +func generateHeadlessServiceForModel( namespace string, baseModelName string, ) *corev1.Service { @@ -168,14 +166,16 @@ func GenerateServiceName(baseModelName string) string { // AddBaseModelLabel adds the base model hash label to a label map if modelRef is present // Uses a hash of the model name to avoid label length/character restrictions func AddBaseModelLabel(labels map[string]string, modelRef *v1alpha1.ModelReference) { - if modelRef != nil && modelRef.Name != "" { - labels[commonconsts.KubeLabelDynamoBaseModelHash] = HashModelName(modelRef.Name) + if labels == nil || modelRef == nil || modelRef.Name == "" { + return } + labels[commonconsts.KubeLabelDynamoBaseModelHash] = HashModelName(modelRef.Name) } // AddBaseModelAnnotation adds the base model annotation to preserve the original model name func AddBaseModelAnnotation(annotations map[string]string, modelRef *v1alpha1.ModelReference) { - if modelRef != nil && modelRef.Name != "" { - annotations[commonconsts.KubeAnnotationDynamoBaseModel] = modelRef.Name + if annotations == nil || modelRef == nil || modelRef.Name == "" { + return } + annotations[commonconsts.KubeAnnotationDynamoBaseModel] = modelRef.Name } diff --git a/deploy/cloud/operator/internal/modelendpoint/client.go b/deploy/cloud/operator/internal/modelendpoint/client.go index 9a603665af..90251b823a 100644 --- a/deploy/cloud/operator/internal/modelendpoint/client.go +++ b/deploy/cloud/operator/internal/modelendpoint/client.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "net/http" - "strings" "time" "sigs.k8s.io/controller-runtime/pkg/log" @@ -63,7 +62,7 @@ func (c *Client) LoadLoRA( logs := log.FromContext(ctx) // Skip loading for non-LoRA models - if strings.ToLower(model.Spec.ModelType) != "lora" { + if !model.IsLoRA() { logs.V(1).Info("Skipping LoRA load for non-LoRA model", "modelType", model.Spec.ModelType) endpoints := make([]v1alpha1.EndpointInfo, len(candidates)) for i, c := range candidates {