From 22bae41474784e157f0d0c1221fd9adc73182621 Mon Sep 17 00:00:00 2001 From: Bartosz Majsak Date: Thu, 19 Mar 2026 16:38:14 +0100 Subject: [PATCH] fix(llmisvc): defer storage version migration until webhooks are serving The controller was crashing on startup because storage version migration ran before the webhook server had registered Service endpoints. The Knative migrator patches each resource with an empty merge patch, which triggers validating webhooks - but with no endpoints available, the API server rejects every patch and the controller exits. Moves migration into a LeaderElectionRunnable so it executes only after webhooks and caches are fully ready. Replaces the short DefaultBackoff retry (~1.5s) with a generous exponential backoff (~2min window) to handle Service endpoint propagation delays on platforms like OpenShift. Fixes [RHOAIENG-54344](https://redhat.atlassian.net/browse/RHOAIENG-54344) --- cmd/llmisvc/main.go | 110 +++++--- go.mod | 2 +- .../llmisvc/test_storage_version_migration.py | 244 ++++++++++++++++++ 3 files changed, 314 insertions(+), 42 deletions(-) create mode 100644 test/e2e/llmisvc/test_storage_version_migration.py diff --git a/cmd/llmisvc/main.go b/cmd/llmisvc/main.go index 72b9428d227..c5fcc592271 100644 --- a/cmd/llmisvc/main.go +++ b/cmd/llmisvc/main.go @@ -22,17 +22,19 @@ import ( "flag" "fmt" "os" + "time" - "golang.org/x/sync/errgroup" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -61,6 +63,14 @@ var ( setupLog = ctrl.Log.WithName("setup") ) +// leaderRunnable is a function that implements both Runnable and +// LeaderElectionRunnable, ensuring it only runs on the elected leader +// and starts after webhooks and caches are ready. +type leaderRunnable func(context.Context) error + +func (r leaderRunnable) Start(ctx context.Context) error { return r(ctx) } +func (r leaderRunnable) NeedLeaderElection() bool { return true } + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(kservescheme.AddLLMISVCAPIs(scheme)) @@ -199,37 +209,17 @@ func main() { os.Exit(1) } - // Register v1alpha2 validators + // Register webhooks: validation (v1alpha1, v1alpha2) and conversion v1alpha2LLMValidator := &v1alpha2.LLMInferenceServiceValidator{} if err = v1alpha2LLMValidator.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha2") os.Exit(1) } - - // Register v1alpha1 validators v1alpha1LLMValidator := &v1alpha1.LLMInferenceServiceValidator{} if err = v1alpha1LLMValidator.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha1") os.Exit(1) } - - setupLog.Info("Setting up LLMInferenceService controller") - llmEventBroadcaster := record.NewBroadcaster() - llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) - if err = (&llmisvc.LLMISVCReconciler{ - Client: mgr.GetClient(), - Config: mgr.GetConfig(), - Clientset: clientSet, - EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}), - Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error { - _, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc) - return err - }, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService") - os.Exit(1) - } - v1alpha1ConfigValidator := &v1alpha1.LLMInferenceServiceConfigValidator{ ConfigValidationFunc: createV1Alpha1ConfigValidationFunc(clientSet), WellKnownConfigChecker: wellKnownConfigChecker, @@ -238,7 +228,6 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha1") os.Exit(1) } - v1alpha2ConfigValidator := &v1alpha2.LLMInferenceServiceConfigValidator{ ConfigValidationFunc: createV1Alpha2ConfigValidationFunc(clientSet), WellKnownConfigChecker: wellKnownConfigChecker, @@ -247,16 +236,12 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha2") os.Exit(1) } - - // Register conversion webhooks for Hub types (v1alpha2) - // This enables automatic API version conversion between v1alpha1 and v1alpha2 if err = ctrl.NewWebhookManagedBy(mgr). For(&v1alpha2.LLMInferenceService{}). Complete(); err != nil { setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceservice") os.Exit(1) } - if err = ctrl.NewWebhookManagedBy(mgr). For(&v1alpha2.LLMInferenceServiceConfig{}). Complete(); err != nil { @@ -264,6 +249,23 @@ func main() { os.Exit(1) } + setupLog.Info("Setting up LLMInferenceService controller") + llmEventBroadcaster := record.NewBroadcaster() + llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + if err = (&llmisvc.LLMISVCReconciler{ + Client: mgr.GetClient(), + Config: mgr.GetConfig(), + Clientset: clientSet, + EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}), + Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error { + _, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc) + return err + }, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService") + os.Exit(1) + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) @@ -273,21 +275,47 @@ func main() { os.Exit(1) } - eg := errgroup.Group{} - migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg)) - for _, gr := range []schema.GroupResource{ - {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"}, - {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"}, - } { - eg.Go(func() error { - if err := migrator.Migrate(ctx, gr); err != nil { - return fmt.Errorf("failed to migrate %q: %w", gr, err) - } - return nil - }) + // Storage version migration runs as a LeaderElection runnable, which starts + // after the webhook server and cache sync are ready. This avoids the + // chicken-and-egg problem where migration patches trigger validating webhooks + // that aren't serving yet. + // migrationBackoff allows enough time for Service endpoints to propagate + // after the webhook server starts. + migrationBackoff := wait.Backoff{ + Duration: 2 * time.Second, + Factor: 1.5, + Jitter: 0.1, + Steps: 10, } - if err := eg.Wait(); err != nil { - setupLog.Error(err, "unable to migrate resources") + if err := mgr.Add(leaderRunnable(func(ctx context.Context) error { + setupLog.Info("running storage version migration") + migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg)) + for _, gr := range []schema.GroupResource{ + {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"}, + {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"}, + } { + var lastErr error + if err := wait.ExponentialBackoffWithContext(ctx, migrationBackoff, func(ctx context.Context) (bool, error) { + if err := migrator.Migrate(ctx, gr); err != nil { + lastErr = err + if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) || apierrors.IsNotFound(err) { + return false, err + } + setupLog.Error(err, "storage version migration attempt failed, retrying", "resource", gr) + return false, nil + } + return true, nil + }); err != nil { + if lastErr != nil && wait.Interrupted(err) { + return fmt.Errorf("storage version migration for %s timed out: %w", gr, lastErr) + } + return fmt.Errorf("storage version migration for %s failed: %w", gr, err) + } + } + setupLog.Info("storage version migration completed") + return nil + })); err != nil { + setupLog.Error(err, "unable to register storage version migration") os.Exit(1) } diff --git a/go.mod b/go.mod index 844e14e565a..e551febc267 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,6 @@ require ( github.com/tidwall/gjson v1.18.0 go.opentelemetry.io/otel/trace v1.39.0 go.uber.org/zap v1.27.1 - golang.org/x/sync v0.19.0 gomodules.xyz/jsonpatch/v2 v2.5.0 google.golang.org/api v0.250.0 google.golang.org/protobuf v1.36.11 @@ -194,6 +193,7 @@ require ( golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect + golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/test/e2e/llmisvc/test_storage_version_migration.py b/test/e2e/llmisvc/test_storage_version_migration.py new file mode 100644 index 00000000000..d0aa3df14af --- /dev/null +++ b/test/e2e/llmisvc/test_storage_version_migration.py @@ -0,0 +1,244 @@ +# Copyright 2025 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +E2E test for storage version migration. + +Verifies that the LLMInferenceService controller correctly runs storage version +migration after the webhook server is ready. The migration patches all resources +to re-encode them in the current storage version (v1alpha2) and then updates the +CRD status.storedVersions to drop stale versions. + +To trigger an actual migration, we simulate an upgrade scenario by patching the +CRD status to include a stale stored version, then restarting the controller. +""" + +import os +import time +import subprocess +import pytest +from kserve import KServeClient, constants +from kubernetes import client + +from .fixtures import ( + inject_k8s_proxy, + KSERVE_TEST_NAMESPACE, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, +) +from .logging import logger + +LLMISVC_CRD_NAME = "llminferenceservices.serving.kserve.io" +LLMISVC_CONFIG_CRD_NAME = "llminferenceserviceconfigs.serving.kserve.io" +CONTROLLER_NAMESPACE = os.environ.get("KSERVE_NAMESPACE", "opendatahub") +CONTROLLER_DEPLOYMENT = "llmisvc-controller-manager" + + +def wait_for(assertion_fn, timeout: float = 60.0, interval: float = 1.0): + """Wait for the assertion to succeed within timeout.""" + deadline = time.time() + timeout + last_error = None + while True: + try: + return assertion_fn() + except (AssertionError, Exception) as e: + last_error = e + if time.time() >= deadline: + raise AssertionError( + f"Timed out after {timeout}s waiting for assertion. Last error: {last_error}" + ) from e + time.sleep(interval) + + +@pytest.mark.llminferenceservice +@pytest.mark.conversion +class TestStorageVersionMigration: + """Test storage version migration runs correctly during controller startup.""" + + @pytest.fixture(autouse=True) + def setup(self): + """Setup test fixtures.""" + inject_k8s_proxy() + self.kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config"), + client_configuration=client.Configuration(), + ) + self.apix_client = client.ApiextensionsV1Api() + self.namespace = KSERVE_TEST_NAMESPACE + self.created_resources = [] + yield + self._cleanup_resources() + + def _cleanup_resources(self): + """Clean up created resources and restore CRD status.""" + # Always restore CRD storedVersions to prevent dirty state + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + try: + self.apix_client.patch_custom_resource_definition_status( + crd_name, + body={"status": {"storedVersions": ["v1alpha2"]}}, + ) + except Exception as e: + logger.warning(f"Failed to restore storedVersions for {crd_name}: {e}") + + if os.getenv("SKIP_RESOURCE_DELETION", "False").lower() in ("true", "1", "t"): + logger.info("Skipping resource deletion after test execution.") + return + + for resource_type, name, version in self.created_resources: + try: + if resource_type == "config": + self.kserve_client.api_instance.delete_namespaced_custom_object( + constants.KSERVE_GROUP, + version, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + name, + ) + except Exception as e: + logger.warning(f"Failed to cleanup {resource_type} {name}: {e}") + + @pytest.mark.cluster_cpu + @pytest.mark.cluster_single_node + def test_storage_version_migration_after_simulated_upgrade(self): + """Test that storage version migration runs successfully after controller restart. + + Simulates an upgrade by: + 1. Creating a resource via v1alpha1 API + 2. Patching CRD storedVersions to include the stale v1alpha1 version + 3. Restarting the controller (which triggers migration on startup) + 4. Verifying storedVersions is cleaned up to only contain v1alpha2 + """ + # 1. Create a config resource via v1alpha1 so we have something to migrate + config_name = "migration-test-config" + config = { + "apiVersion": f"{constants.KSERVE_GROUP}/{constants.KSERVE_V1ALPHA1_VERSION}", + "kind": "LLMInferenceServiceConfig", + "metadata": { + "name": config_name, + "namespace": self.namespace, + }, + "spec": { + "model": {"uri": "hf://facebook/opt-125m", "name": "facebook/opt-125m"}, + "router": {"route": {}}, + "template": { + "containers": [ + { + "name": "main", + "image": "public.ecr.aws/q9t5s3a7/vllm-cpu-release-repo:v0.17.1", + "resources": { + "limits": {"cpu": "1", "memory": "2Gi"}, + "requests": {"cpu": "100m", "memory": "512Mi"}, + }, + } + ] + }, + }, + } + self.kserve_client.api_instance.create_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA1_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config, + ) + self.created_resources.append( + ("config", config_name, constants.KSERVE_V1ALPHA2_VERSION) + ) + logger.info(f"Created LLMInferenceServiceConfig {config_name} via v1alpha1") + + # 2. Patch CRD storedVersions to simulate upgrade state. + # After a real upgrade from v1alpha1-only to v1alpha1+v1alpha2, + # storedVersions would be ["v1alpha1", "v1alpha2"]. This triggers + # the migrator to re-encode all resources in the current storage version. + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + self.apix_client.patch_custom_resource_definition_status( + crd_name, + body={"status": {"storedVersions": ["v1alpha1", "v1alpha2"]}}, + ) + logger.info(f"Patched {crd_name} storedVersions to [v1alpha1, v1alpha2]") + + # Verify the patch took effect + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + crd = self.apix_client.read_custom_resource_definition(crd_name) + assert set(crd.status.stored_versions) == {"v1alpha1", "v1alpha2"}, ( + f"Expected storedVersions to contain both versions, got {crd.status.stored_versions}" + ) + + # 3. Restart the controller to trigger migration on startup. + # The controller runs migration as a manager Runnable that executes + # after the webhook server is ready. + logger.info(f"Restarting {CONTROLLER_DEPLOYMENT} in {CONTROLLER_NAMESPACE}") + subprocess.run( + [ + "kubectl", + "rollout", + "restart", + f"deployment/{CONTROLLER_DEPLOYMENT}", + "-n", + CONTROLLER_NAMESPACE, + ], + check=True, + ) + subprocess.run( + [ + "kubectl", + "rollout", + "status", + f"deployment/{CONTROLLER_DEPLOYMENT}", + "-n", + CONTROLLER_NAMESPACE, + "--timeout=120s", + ], + check=True, + ) + logger.info("Controller restarted successfully") + + # 4. Verify storedVersions has been cleaned up by the migrator. + # The migrator patches all resources with an empty merge patch to + # re-encode them in v1alpha2, then drops v1alpha1 from storedVersions. + def assert_stored_versions_migrated(): + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + crd = self.apix_client.read_custom_resource_definition(crd_name) + assert crd.status.stored_versions == ["v1alpha2"], ( + f"Expected storedVersions=['v1alpha2'] after migration, " + f"got {crd.status.stored_versions} for {crd_name}" + ) + + wait_for(assert_stored_versions_migrated, timeout=180.0, interval=5.0) + logger.info("Storage version migration completed - storedVersions cleaned up") + + # 5. Verify the resource is still accessible via both API versions + v1 = self.kserve_client.api_instance.get_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA1_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config_name, + ) + assert v1 is not None + assert v1["metadata"]["name"] == config_name + + v2 = self.kserve_client.api_instance.get_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA2_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config_name, + ) + assert v2 is not None + assert v2["metadata"]["name"] == config_name + + logger.info( + "Resource accessible via both v1alpha1 and v1alpha2 after migration" + )