Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions cmd/ocifit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (

var (
universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
skipInstanceTypes = make(map[string]bool)
)

// WebhookServer with Node Cache and a direct k8s client
Expand Down Expand Up @@ -408,12 +409,29 @@ func (ws *WebhookServer) getAllNodeFeatures(ctx context.Context) ([]map[string]i
if err := ws.k8sClient.List(ctx, &nodeList, client.MatchingFields{"spec.unschedulable": "false"}); err != nil {
return nil, err
}

// We will do a filtering here for types we don't want to include
identityLabelKey := os.Getenv("NODE_IDENTITY_LABEL")
if identityLabelKey == "" {
identityLabelKey = instanceTypeLabel
}

var featureMatrix []map[string]interface{}
for _, node := range nodeList.Items {
features := make(map[string]interface{})
for key, val := range node.Labels {
features[key] = val
}

// Be conservative - don't include nodes we cannot identify
identityValue, ok := features[identityLabelKey].(string)
if !ok {
continue
}
if skipInstanceTypes[identityValue] {
log.Printf("Skipping dynamically discovered instance '%s' as it is in the skip list.", identityValue)
continue
}
featureMatrix = append(featureMatrix, features)
}
return featureMatrix, nil
Expand All @@ -422,6 +440,7 @@ func (ws *WebhookServer) getAllNodeFeatures(ctx context.Context) ([]map[string]i
// getCombinedNodeFeatures merges dynamically discovered nodes with a static catalog from a directory.
// It correctly handles static files that contain a JSON array (list) of node feature objects.
func (ws *WebhookServer) getCombinedNodeFeatures(ctx context.Context, staticFeaturesDir string) ([]map[string]interface{}, error) {

// Get all currently running schedulable nodes.
dynamicallyDiscoveredFeatures, err := ws.getAllNodeFeatures(ctx)
if err != nil {
Expand Down Expand Up @@ -484,6 +503,14 @@ func (ws *WebhookServer) getCombinedNodeFeatures(ctx context.Context, staticFeat
}

for _, staticFeatures := range featureList {
identityValue, ok := staticFeatures[identityLabelKey].(string)
if !ok {
continue
}
if skipInstanceTypes[identityValue] {
log.Printf("Skipping static instance '%s' from file '%s' as it is in the skip list.", identityValue, file.Name())
continue
}

// De-duplication check...
if identityValue, ok := staticFeatures[identityLabelKey].(string); ok {
Expand Down Expand Up @@ -606,7 +633,25 @@ func (ws *WebhookServer) handleMutate(w http.ResponseWriter, r *http.Request) {
w.Write(respBody)
}

func discoverSkipInstances() {
skipList := os.Getenv("SKIP_INSTANCE_TYPES")
if skipList != "" {
instances := strings.Split(skipList, ",")
for _, instance := range instances {
trimmed := strings.TrimSpace(instance)
if trimmed != "" {
log.Printf("Configuration: Will skip instance type '%s'", trimmed)
skipInstanceTypes[trimmed] = true
}
}
}
}

func main() {

// Ensure we parse skip instances
discoverSkipInstances()

config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
Expand Down
Binary file modified mlserver/models/lasso_model_fom.joblib
Binary file not shown.
Binary file modified mlserver/models/lasso_model_fom_per_dollar.joblib
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.