diff --git a/cmd/llmisvc/main.go b/cmd/llmisvc/main.go index 72b9428d227..c5fcc592271 100644 --- a/cmd/llmisvc/main.go +++ b/cmd/llmisvc/main.go @@ -22,17 +22,19 @@ import ( "flag" "fmt" "os" + "time" - "golang.org/x/sync/errgroup" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -61,6 +63,14 @@ var ( setupLog = ctrl.Log.WithName("setup") ) +// leaderRunnable is a function that implements both Runnable and +// LeaderElectionRunnable, ensuring it only runs on the elected leader +// and starts after webhooks and caches are ready. +type leaderRunnable func(context.Context) error + +func (r leaderRunnable) Start(ctx context.Context) error { return r(ctx) } +func (r leaderRunnable) NeedLeaderElection() bool { return true } + func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(kservescheme.AddLLMISVCAPIs(scheme)) @@ -199,37 +209,17 @@ func main() { os.Exit(1) } - // Register v1alpha2 validators + // Register webhooks: validation (v1alpha1, v1alpha2) and conversion v1alpha2LLMValidator := &v1alpha2.LLMInferenceServiceValidator{} if err = v1alpha2LLMValidator.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha2") os.Exit(1) } - - // Register v1alpha1 validators v1alpha1LLMValidator := &v1alpha1.LLMInferenceServiceValidator{} if err = v1alpha1LLMValidator.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha1") os.Exit(1) } - - setupLog.Info("Setting up LLMInferenceService controller") - llmEventBroadcaster := record.NewBroadcaster() - llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) - if err = (&llmisvc.LLMISVCReconciler{ - Client: mgr.GetClient(), - Config: mgr.GetConfig(), - Clientset: clientSet, - EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}), - Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error { - _, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc) - return err - }, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService") - os.Exit(1) - } - v1alpha1ConfigValidator := &v1alpha1.LLMInferenceServiceConfigValidator{ ConfigValidationFunc: createV1Alpha1ConfigValidationFunc(clientSet), WellKnownConfigChecker: wellKnownConfigChecker, @@ -238,7 +228,6 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha1") os.Exit(1) } - v1alpha2ConfigValidator := &v1alpha2.LLMInferenceServiceConfigValidator{ ConfigValidationFunc: createV1Alpha2ConfigValidationFunc(clientSet), WellKnownConfigChecker: wellKnownConfigChecker, @@ -247,16 +236,12 @@ func main() { setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha2") os.Exit(1) } - - // Register conversion webhooks for Hub types (v1alpha2) - // This enables automatic API version conversion between v1alpha1 and v1alpha2 if err = ctrl.NewWebhookManagedBy(mgr). For(&v1alpha2.LLMInferenceService{}). Complete(); err != nil { setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceservice") os.Exit(1) } - if err = ctrl.NewWebhookManagedBy(mgr). For(&v1alpha2.LLMInferenceServiceConfig{}). Complete(); err != nil { @@ -264,6 +249,23 @@ func main() { os.Exit(1) } + setupLog.Info("Setting up LLMInferenceService controller") + llmEventBroadcaster := record.NewBroadcaster() + llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + if err = (&llmisvc.LLMISVCReconciler{ + Client: mgr.GetClient(), + Config: mgr.GetConfig(), + Clientset: clientSet, + EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}), + Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error { + _, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc) + return err + }, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService") + os.Exit(1) + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) @@ -273,21 +275,47 @@ func main() { os.Exit(1) } - eg := errgroup.Group{} - migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg)) - for _, gr := range []schema.GroupResource{ - {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"}, - {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"}, - } { - eg.Go(func() error { - if err := migrator.Migrate(ctx, gr); err != nil { - return fmt.Errorf("failed to migrate %q: %w", gr, err) - } - return nil - }) + // Storage version migration runs as a LeaderElection runnable, which starts + // after the webhook server and cache sync are ready. This avoids the + // chicken-and-egg problem where migration patches trigger validating webhooks + // that aren't serving yet. + // migrationBackoff allows enough time for Service endpoints to propagate + // after the webhook server starts. + migrationBackoff := wait.Backoff{ + Duration: 2 * time.Second, + Factor: 1.5, + Jitter: 0.1, + Steps: 10, } - if err := eg.Wait(); err != nil { - setupLog.Error(err, "unable to migrate resources") + if err := mgr.Add(leaderRunnable(func(ctx context.Context) error { + setupLog.Info("running storage version migration") + migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg)) + for _, gr := range []schema.GroupResource{ + {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"}, + {Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"}, + } { + var lastErr error + if err := wait.ExponentialBackoffWithContext(ctx, migrationBackoff, func(ctx context.Context) (bool, error) { + if err := migrator.Migrate(ctx, gr); err != nil { + lastErr = err + if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) || apierrors.IsNotFound(err) { + return false, err + } + setupLog.Error(err, "storage version migration attempt failed, retrying", "resource", gr) + return false, nil + } + return true, nil + }); err != nil { + if lastErr != nil && wait.Interrupted(err) { + return fmt.Errorf("storage version migration for %s timed out: %w", gr, lastErr) + } + return fmt.Errorf("storage version migration for %s failed: %w", gr, err) + } + } + setupLog.Info("storage version migration completed") + return nil + })); err != nil { + setupLog.Error(err, "unable to register storage version migration") os.Exit(1) } diff --git a/go.mod b/go.mod index 844e14e565a..e551febc267 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,6 @@ require ( github.com/tidwall/gjson v1.18.0 go.opentelemetry.io/otel/trace v1.39.0 go.uber.org/zap v1.27.1 - golang.org/x/sync v0.19.0 gomodules.xyz/jsonpatch/v2 v2.5.0 google.golang.org/api v0.250.0 google.golang.org/protobuf v1.36.11 @@ -194,6 +193,7 @@ require ( golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect + golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/test/e2e/llmisvc/test_storage_version_migration.py b/test/e2e/llmisvc/test_storage_version_migration.py new file mode 100644 index 00000000000..d0aa3df14af --- /dev/null +++ b/test/e2e/llmisvc/test_storage_version_migration.py @@ -0,0 +1,244 @@ +# Copyright 2025 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +E2E test for storage version migration. + +Verifies that the LLMInferenceService controller correctly runs storage version +migration after the webhook server is ready. The migration patches all resources +to re-encode them in the current storage version (v1alpha2) and then updates the +CRD status.storedVersions to drop stale versions. + +To trigger an actual migration, we simulate an upgrade scenario by patching the +CRD status to include a stale stored version, then restarting the controller. +""" + +import os +import time +import subprocess +import pytest +from kserve import KServeClient, constants +from kubernetes import client + +from .fixtures import ( + inject_k8s_proxy, + KSERVE_TEST_NAMESPACE, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, +) +from .logging import logger + +LLMISVC_CRD_NAME = "llminferenceservices.serving.kserve.io" +LLMISVC_CONFIG_CRD_NAME = "llminferenceserviceconfigs.serving.kserve.io" +CONTROLLER_NAMESPACE = os.environ.get("KSERVE_NAMESPACE", "opendatahub") +CONTROLLER_DEPLOYMENT = "llmisvc-controller-manager" + + +def wait_for(assertion_fn, timeout: float = 60.0, interval: float = 1.0): + """Wait for the assertion to succeed within timeout.""" + deadline = time.time() + timeout + last_error = None + while True: + try: + return assertion_fn() + except (AssertionError, Exception) as e: + last_error = e + if time.time() >= deadline: + raise AssertionError( + f"Timed out after {timeout}s waiting for assertion. Last error: {last_error}" + ) from e + time.sleep(interval) + + +@pytest.mark.llminferenceservice +@pytest.mark.conversion +class TestStorageVersionMigration: + """Test storage version migration runs correctly during controller startup.""" + + @pytest.fixture(autouse=True) + def setup(self): + """Setup test fixtures.""" + inject_k8s_proxy() + self.kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config"), + client_configuration=client.Configuration(), + ) + self.apix_client = client.ApiextensionsV1Api() + self.namespace = KSERVE_TEST_NAMESPACE + self.created_resources = [] + yield + self._cleanup_resources() + + def _cleanup_resources(self): + """Clean up created resources and restore CRD status.""" + # Always restore CRD storedVersions to prevent dirty state + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + try: + self.apix_client.patch_custom_resource_definition_status( + crd_name, + body={"status": {"storedVersions": ["v1alpha2"]}}, + ) + except Exception as e: + logger.warning(f"Failed to restore storedVersions for {crd_name}: {e}") + + if os.getenv("SKIP_RESOURCE_DELETION", "False").lower() in ("true", "1", "t"): + logger.info("Skipping resource deletion after test execution.") + return + + for resource_type, name, version in self.created_resources: + try: + if resource_type == "config": + self.kserve_client.api_instance.delete_namespaced_custom_object( + constants.KSERVE_GROUP, + version, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + name, + ) + except Exception as e: + logger.warning(f"Failed to cleanup {resource_type} {name}: {e}") + + @pytest.mark.cluster_cpu + @pytest.mark.cluster_single_node + def test_storage_version_migration_after_simulated_upgrade(self): + """Test that storage version migration runs successfully after controller restart. + + Simulates an upgrade by: + 1. Creating a resource via v1alpha1 API + 2. Patching CRD storedVersions to include the stale v1alpha1 version + 3. Restarting the controller (which triggers migration on startup) + 4. Verifying storedVersions is cleaned up to only contain v1alpha2 + """ + # 1. Create a config resource via v1alpha1 so we have something to migrate + config_name = "migration-test-config" + config = { + "apiVersion": f"{constants.KSERVE_GROUP}/{constants.KSERVE_V1ALPHA1_VERSION}", + "kind": "LLMInferenceServiceConfig", + "metadata": { + "name": config_name, + "namespace": self.namespace, + }, + "spec": { + "model": {"uri": "hf://facebook/opt-125m", "name": "facebook/opt-125m"}, + "router": {"route": {}}, + "template": { + "containers": [ + { + "name": "main", + "image": "public.ecr.aws/q9t5s3a7/vllm-cpu-release-repo:v0.17.1", + "resources": { + "limits": {"cpu": "1", "memory": "2Gi"}, + "requests": {"cpu": "100m", "memory": "512Mi"}, + }, + } + ] + }, + }, + } + self.kserve_client.api_instance.create_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA1_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config, + ) + self.created_resources.append( + ("config", config_name, constants.KSERVE_V1ALPHA2_VERSION) + ) + logger.info(f"Created LLMInferenceServiceConfig {config_name} via v1alpha1") + + # 2. Patch CRD storedVersions to simulate upgrade state. + # After a real upgrade from v1alpha1-only to v1alpha1+v1alpha2, + # storedVersions would be ["v1alpha1", "v1alpha2"]. This triggers + # the migrator to re-encode all resources in the current storage version. + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + self.apix_client.patch_custom_resource_definition_status( + crd_name, + body={"status": {"storedVersions": ["v1alpha1", "v1alpha2"]}}, + ) + logger.info(f"Patched {crd_name} storedVersions to [v1alpha1, v1alpha2]") + + # Verify the patch took effect + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + crd = self.apix_client.read_custom_resource_definition(crd_name) + assert set(crd.status.stored_versions) == {"v1alpha1", "v1alpha2"}, ( + f"Expected storedVersions to contain both versions, got {crd.status.stored_versions}" + ) + + # 3. Restart the controller to trigger migration on startup. + # The controller runs migration as a manager Runnable that executes + # after the webhook server is ready. + logger.info(f"Restarting {CONTROLLER_DEPLOYMENT} in {CONTROLLER_NAMESPACE}") + subprocess.run( + [ + "kubectl", + "rollout", + "restart", + f"deployment/{CONTROLLER_DEPLOYMENT}", + "-n", + CONTROLLER_NAMESPACE, + ], + check=True, + ) + subprocess.run( + [ + "kubectl", + "rollout", + "status", + f"deployment/{CONTROLLER_DEPLOYMENT}", + "-n", + CONTROLLER_NAMESPACE, + "--timeout=120s", + ], + check=True, + ) + logger.info("Controller restarted successfully") + + # 4. Verify storedVersions has been cleaned up by the migrator. + # The migrator patches all resources with an empty merge patch to + # re-encode them in v1alpha2, then drops v1alpha1 from storedVersions. + def assert_stored_versions_migrated(): + for crd_name in [LLMISVC_CONFIG_CRD_NAME, LLMISVC_CRD_NAME]: + crd = self.apix_client.read_custom_resource_definition(crd_name) + assert crd.status.stored_versions == ["v1alpha2"], ( + f"Expected storedVersions=['v1alpha2'] after migration, " + f"got {crd.status.stored_versions} for {crd_name}" + ) + + wait_for(assert_stored_versions_migrated, timeout=180.0, interval=5.0) + logger.info("Storage version migration completed - storedVersions cleaned up") + + # 5. Verify the resource is still accessible via both API versions + v1 = self.kserve_client.api_instance.get_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA1_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config_name, + ) + assert v1 is not None + assert v1["metadata"]["name"] == config_name + + v2 = self.kserve_client.api_instance.get_namespaced_custom_object( + constants.KSERVE_GROUP, + constants.KSERVE_V1ALPHA2_VERSION, + self.namespace, + KSERVE_PLURAL_LLMINFERENCESERVICECONFIG, + config_name, + ) + assert v2 is not None + assert v2["metadata"]["name"] == config_name + + logger.info( + "Resource accessible via both v1alpha1 and v1alpha2 after migration" + )