Skip to content

Commit 5ffbd26

Browse files
committed
feat: support for customizing minicluster
Signed-off-by: vsoch <[email protected]>
1 parent 5bed5ab commit 5ffbd26

File tree

7 files changed

+223
-40
lines changed

7 files changed

+223
-40
lines changed

cmd/ocifit/main.go

Lines changed: 126 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ import (
1818
"time"
1919

2020
"ghcr.io/compspec/ocifit-k8s/pkg/artifact"
21+
"ghcr.io/compspec/ocifit-k8s/pkg/flux"
2122
"ghcr.io/compspec/ocifit-k8s/pkg/types"
2223
"ghcr.io/compspec/ocifit-k8s/pkg/validator"
24+
25+
miniclusterv1alpha2 "github.com/flux-framework/flux-operator/api/v1alpha2"
2326
admissionv1 "k8s.io/api/admission/v1"
2427
corev1 "k8s.io/api/core/v1"
2528
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -50,13 +53,6 @@ var (
5053
universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
5154
)
5255

53-
// JSONPatch represents a single JSON patch operation
54-
type JSONPatch struct {
55-
Op string `json:"op"`
56-
Path string `json:"path"`
57-
Value interface{} `json:"value,omitempty"`
58-
}
59-
6056
// PredictionRequest is the payload sent to the Python model server.
6157
type PredictionRequest struct {
6258
MetricName string `json:"metric_name"`
@@ -113,13 +109,26 @@ func deny(ar *admissionv1.AdmissionReview, message string) *admissionv1.Admissio
113109
}
114110
}
115111

116-
// mutate is the core logic to look for compatibility labels and select a new image
112+
// mutate responds to the object kind, with different behavior depending on the type
117113
func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
114+
// Use the Kind from the AdmissionReview request to route to the correct handler
115+
switch ar.Request.Kind.Kind {
116+
case "Pod":
117+
return ws.mutatePod(ar)
118+
case "MiniCluster":
119+
return ws.mutateMiniCluster(ar)
120+
default:
121+
log.Printf("Webhook received unhandled kind '%s', allowing.", ar.Request.Kind.Kind)
122+
return admit(ar)
123+
}
124+
}
125+
126+
// mutate is the core logic to look for compatibility labels and select a new image
127+
func (ws *WebhookServer) mutatePod(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
118128
pod := &corev1.Pod{}
119129
if err := json.Unmarshal(ar.Request.Object.Raw, pod); err != nil {
120130
return deny(ar, fmt.Sprintf("could not decode pod object: %v", err))
121131
}
122-
123132
if val, ok := pod.Labels[enabledLabel]; !ok || val != "true" {
124133
return admit(ar)
125134
}
@@ -129,10 +138,6 @@ func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.Ad
129138
if !ok {
130139
return deny(ar, fmt.Sprintf("missing required annotation: %s", imageRefAnnotation))
131140
}
132-
targetRef, ok := pod.Annotations[targetImage]
133-
if !ok {
134-
targetRef = targetRefDefault
135-
}
136141

137142
ctx := context.Background()
138143
genericSpec, err := artifact.DownloadCompatibilityArtifact(ctx, imageRef)
@@ -142,12 +147,12 @@ func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.Ad
142147

143148
var finalImage, finalInstance string
144149
var prediction *PredictionResponse
145-
var patches []JSONPatch
150+
var patches []types.JSONPatch
146151

147152
switch spec := genericSpec.(type) {
148153
case *types.ModelCompatibilitySpec:
149154
log.Println("Handling artifact as ModelCompatibilitySpec")
150-
finalImage, prediction, err = ws.selectImageWithModel(ctx, pod, spec)
155+
finalImage, prediction, err = ws.selectImageWithModel(ctx, pod.Annotations, spec)
151156

152157
// For the model spec, we make a request to the server here
153158
if err != nil {
@@ -156,7 +161,7 @@ func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.Ad
156161

157162
// If the pod's nodeSelector is nil, we need to create it with an "add" operation.
158163
if pod.Spec.NodeSelector == nil {
159-
patches = append(patches, JSONPatch{
164+
patches = append(patches, types.JSONPatch{
160165
Op: "add",
161166
Path: "/spec/nodeSelector",
162167
Value: map[string]string{
@@ -168,8 +173,8 @@ func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.Ad
168173
// If nodeSelector already exists, we add/replace the specific key.
169174
// JSON Patch requires escaping '/' with '~1' for keys in a path.
170175
escapedKey := strings.ReplaceAll(instanceTypeLabel, "/", "~1")
171-
patches = append(patches, JSONPatch{
172-
Op: "add", // "add" on an existing map key works like "replace"
176+
patches = append(patches, types.JSONPatch{
177+
Op: "add",
173178
Path: fmt.Sprintf("/spec/nodeSelector/%s", escapedKey),
174179
Value: finalInstance,
175180
})
@@ -186,28 +191,114 @@ func (ws *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.Ad
186191
return deny(ar, "downloaded artifact has an unknown or unsupported compatibility spec format")
187192
}
188193

189-
// For the compatibility spec, we patch the container image (this is the selection)
190-
// For the model spec, the model provides a matching arch image.
194+
patches, err = ws.replaceContainers(patches, pod.Annotations, pod.Spec.Containers, finalImage)
195+
if err != nil {
196+
return deny(ar, fmt.Sprintf("%s", err))
197+
}
198+
199+
patchBytes, err := json.Marshal(patches)
200+
if err != nil {
201+
return deny(ar, fmt.Sprintf("failed to marshal patch: %v", err))
202+
}
203+
204+
patchType := admissionv1.PatchTypeJSONPatch
205+
return &admissionv1.AdmissionResponse{
206+
Allowed: true,
207+
UID: ar.Request.UID,
208+
Patch: patchBytes,
209+
PatchType: &patchType,
210+
}
211+
}
212+
213+
// replaceContainers updates containers via add patches
214+
func (ws *WebhookServer) replaceContainers(
215+
patches []types.JSONPatch,
216+
annotations map[string]string,
217+
containers []corev1.Container,
218+
finalImage string,
219+
) ([]types.JSONPatch, error) {
220+
221+
targetRef, ok := annotations[targetImage]
222+
if !ok {
223+
targetRef = targetRefDefault
224+
}
225+
226+
// Replace any placeholder images with the final. We need at least one.
227+
// Unlike the pod replacement, don't break - could be another container
191228
containerFound := false
192-
for i, c := range pod.Spec.Containers {
229+
for i, c := range containers {
193230
if c.Image == targetRef {
194-
patches = append(patches, JSONPatch{
231+
patches = append(patches, types.JSONPatch{
195232
Op: "replace",
196233
Path: fmt.Sprintf("/spec/containers/%d/image", i),
197234
Value: finalImage,
198235
})
199236
containerFound = true
200-
break
201237
}
202238
}
203239
if !containerFound {
204-
return deny(ar, fmt.Sprintf("container %s not found", targetRef))
240+
return patches, fmt.Errorf("container %s not found", targetRef)
205241
}
242+
return patches, nil
243+
}
206244

245+
// mutateMiniCluster still selects instance type and pods, but also exposes customization of the entire setup
246+
func (ws *WebhookServer) mutateMiniCluster(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
247+
mc := &miniclusterv1alpha2.MiniCluster{}
248+
var rawObject map[string]interface{}
249+
err := json.Unmarshal(ar.Request.Object.Raw, mc)
207250
if err != nil {
208-
return deny(ar, fmt.Sprintf("failed to find compatible image: %v", err))
251+
return deny(ar, fmt.Sprintf("could not decode MiniCluster object: %v", err))
209252
}
210253

254+
// Let's use the raw object for checking additional fields
255+
err = json.Unmarshal(ar.Request.Object.Raw, rawObject)
256+
if err != nil {
257+
return deny(ar, fmt.Sprintf("could not decode MiniCluster into raw object: %v", err))
258+
}
259+
260+
// We use the MiniCluster's own labels and annotations
261+
if val, ok := mc.Labels[enabledLabel]; !ok || val != "true" {
262+
return admit(ar)
263+
}
264+
log.Printf("Mutating MiniCluster %s/%s", mc.Namespace, mc.Name)
265+
imageRef, ok := mc.Annotations[imageRefAnnotation]
266+
if !ok {
267+
return deny(ar, fmt.Sprintf("missing required annotation on MiniCluster: %s", imageRefAnnotation))
268+
}
269+
270+
ctx := context.Background()
271+
genericSpec, err := artifact.DownloadCompatibilityArtifact(ctx, imageRef)
272+
if err != nil {
273+
return deny(ar, fmt.Sprintf("compatibility spec %s issue: %v", imageRef, err))
274+
}
275+
276+
// For now, I'm assuming MiniClusters will primarily use Model specs.
277+
// You could expand this to use feature-based selection as well.
278+
spec, ok := genericSpec.(*types.ModelCompatibilitySpec)
279+
if !ok {
280+
return deny(ar, "received artifact for MiniCluster that was not a ModelCompatibilitySpec")
281+
}
282+
283+
// For a MiniCluster, we assume we should evaluate against ALL nodes in the cluster
284+
// as the scheduler will decide where to place the pods later.
285+
// We pass nil for the nodeSelector to indicate this.
286+
finalImage, prediction, err := ws.selectImageWithModel(ctx, mc.Annotations, spec)
287+
if err != nil {
288+
return deny(ar, fmt.Sprintf("Issue with Model Compatibility Spec: %s", err))
289+
}
290+
291+
// Replace all referenced containers based on target name
292+
containers := []corev1.Container{}
293+
for _, container := range mc.Spec.Containers {
294+
containers = append(containers, corev1.Container{Image: container.Image})
295+
}
296+
297+
patches := flux.GetPatches(rawObject, prediction.Arch)
298+
patches, err = ws.replaceContainers(patches, mc.Annotations, containers, finalImage)
299+
if err != nil {
300+
return deny(ar, fmt.Sprintf("%s", err))
301+
}
211302
patchBytes, err := json.Marshal(patches)
212303
if err != nil {
213304
return deny(ar, fmt.Sprintf("failed to marshal patch: %v", err))
@@ -245,18 +336,19 @@ func (ws *WebhookServer) selectImageWithFeatures(pod *corev1.Pod, spec *types.Co
245336

246337
// selectImageWithModel uses the model compatibility spec to ping the sidecar server, get the best instance,
247338
// and then look up the correct container image for the instance's architecture.
339+
// We can accept annotations from a pod or MiniCluster (or other future abstraction)
248340
func (ws *WebhookServer) selectImageWithModel(
249341
ctx context.Context,
250-
pod *corev1.Pod,
342+
annotations map[string]string,
251343
spec *types.ModelCompatibilitySpec,
252344
) (string, *PredictionResponse, error) {
253-
modelHint, ok := pod.Annotations[modelSelectionAnnotation]
345+
346+
modelHint, ok := annotations[modelSelectionAnnotation]
254347
if !ok {
255-
return "", nil, fmt.Errorf("model-based spec was provided, but pod is missing annotation: %s", modelSelectionAnnotation)
348+
return "", nil, fmt.Errorf("model-based spec was provided, but object is missing annotation: %s", modelSelectionAnnotation)
256349
}
257350

258351
var selectedRule *types.ModelSpec
259-
fmt.Println(spec.Compatibilities)
260352
for _, comp := range spec.Compatibilities {
261353
if comp.Tag == modelHint && len(comp.Rules) > 0 {
262354
selectedRule = &comp.Rules[0].MatchModel.Model
@@ -273,31 +365,29 @@ func (ws *WebhookServer) selectImageWithModel(
273365
if err != nil {
274366
return "", nil, fmt.Errorf("failed to get combined node features: %w", err)
275367
}
368+
276369
if len(nodeFeatures) == 0 {
277-
return "", nil, fmt.Errorf("no schedulable nodes found in the cluster or static cache to evaluate")
370+
return "", nil, fmt.Errorf("no nodes found to evaluate for model selection")
278371
}
279372

280-
// Make a request to the ML server to get the max/min for the given metric
281373
requestPayload := PredictionRequest{
282374
MetricName: selectedRule.Name,
283375
Directionality: selectedRule.Direction,
284376
Features: nodeFeatures,
285377
}
378+
286379
log.Printf("Sending prediction request for model '%s' with %d nodes", requestPayload.MetricName, len(requestPayload.Features))
287380
prediction, err := callPredictEndpoint(ctx, requestPayload)
288-
fmt.Println(prediction)
289381
if err != nil {
290382
return "", nil, fmt.Errorf("model prediction failed: %w", err)
291383
}
292-
log.Printf("Model server selected instance with features: %+v", prediction.Instance)
384+
log.Printf("Model server selected instance with features: %+v", prediction.SelectedInstance)
293385

294-
// Get the final container image URI for a matching arch.
295386
finalImage, ok := selectedRule.Platforms[prediction.Arch]
296387
if !ok {
297388
return "", nil, fmt.Errorf("model spec's 'platforms' map does not have an entry for the chosen architecture '%s'", prediction.Arch)
298389
}
299-
300-
log.Printf("Model selected %s '%s' and arch '%s', resulting in final image: '%s'", prediction.InstanceSelector, prediction.Instance, prediction.Arch, finalImage)
390+
log.Printf("Model selected instance '%s' and arch '%s', resulting in final image: '%s'", prediction.Instance, prediction.Arch, finalImage)
301391

302392
// Return the instance name for the nodeSelector patch and the final image for the container patch.
303393
return finalImage, prediction, nil
@@ -405,7 +495,6 @@ func (ws *WebhookServer) getCombinedNodeFeatures(ctx context.Context, staticFeat
405495
continue
406496
}
407497
var featureList []map[string]interface{}
408-
fmt.Println(content)
409498
if err := json.Unmarshal(content, &featureList); err != nil {
410499
log.Printf("Warning: Failed to parse static feature file %s as a list of features: %v", filePath, err)
411500
continue

deploy/webhook.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,9 @@ webhooks:
129129
apiGroups: [""]
130130
apiVersions: ["v1"]
131131
resources: ["pods"]
132+
- operations: [ "CREATE" ]
133+
apiGroups: ["flux-framework.org"]
134+
apiVersions: ["v1alpha2"]
135+
resources: ["miniclusters"]
132136
sideEffects: None
133137
admissionReviewVersions: ["v1"]

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.24.0
55
toolchain go1.24.4
66

77
require (
8+
github.com/flux-framework/flux-operator v0.0.0-20250720220112-117c17c5fde1
89
github.com/opencontainers/image-spec v1.1.1
910
k8s.io/api v0.33.1
1011
k8s.io/apimachinery v0.33.1

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER
99
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
1010
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
1111
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
12+
github.com/flux-framework/flux-operator v0.0.0-20250720220112-117c17c5fde1 h1:c7re11vfsEDcIORM3XwDfVdReqzkWoFzlFfNs5r1+Io=
13+
github.com/flux-framework/flux-operator v0.0.0-20250720220112-117c17c5fde1/go.mod h1:ynusi4Hu8LMlpu74Xzr/EL2YCqP87hecBKs1/CVXur0=
1214
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
1315
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
1416
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
@@ -59,6 +61,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
5961
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
6062
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
6163
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
64+
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
6265
github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg=
6366
github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
6467
github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw=

mlserver/app.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ def _choose_randomly(feature_matrix):
175175
"instance_index": chosen_index,
176176
"instance": identifiers[chosen_index],
177177
"instance-selector": (
178-
HOSTNAME_LABEL
179-
if HOSTNAME_LABEL in selected_node_features
180-
else INSTANCE_TYPE_LABEL
178+
INSTANCE_TYPE_LABEL
179+
if INSTANCE_TYPE_LABEL in selected_node_features
180+
else HOSTNAME_LABEL
181181
),
182182
"arch": selected_node_features.get(ARCH_LABEL, "unknown"),
183183
"score": None, # No score for random selection

0 commit comments

Comments
 (0)