Skip to content

Commit 81121b0

Browse files
fix(llmisvc): defer storage version migration until webhooks are serving (#1251)
The controller was crashing on startup because storage version migration ran before the webhook server had registered Service endpoints. The Knative migrator patches each resource with an empty merge patch, which triggers validating webhooks - but with no endpoints available, the API server rejects every patch and the controller exits. Moves migration into a LeaderElectionRunnable so it executes only after webhooks and caches are fully ready. Replaces the short DefaultBackoff retry (~1.5s) with a generous exponential backoff (~2min window) to handle Service endpoint propagation delays on platforms like OpenShift. Fixes [RHOAIENG-54344](https://redhat.atlassian.net/browse/RHOAIENG-54344)
1 parent f5b52dd commit 81121b0

File tree

3 files changed

+314
-42
lines changed

3 files changed

+314
-42
lines changed

cmd/llmisvc/main.go

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,19 @@ import (
2222
"flag"
2323
"fmt"
2424
"os"
25+
"time"
2526

26-
"golang.org/x/sync/errgroup"
2727
appsv1 "k8s.io/api/apps/v1"
2828
autoscalingv2 "k8s.io/api/autoscaling/v2"
2929
corev1 "k8s.io/api/core/v1"
3030
apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
31+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/fields"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/runtime/schema"
3536
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37+
"k8s.io/apimachinery/pkg/util/wait"
3638
"k8s.io/client-go/dynamic"
3739
"k8s.io/client-go/kubernetes"
3840
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -61,6 +63,14 @@ var (
6163
setupLog = ctrl.Log.WithName("setup")
6264
)
6365

66+
// leaderRunnable is a function that implements both Runnable and
67+
// LeaderElectionRunnable, ensuring it only runs on the elected leader
68+
// and starts after webhooks and caches are ready.
69+
type leaderRunnable func(context.Context) error
70+
71+
func (r leaderRunnable) Start(ctx context.Context) error { return r(ctx) }
72+
func (r leaderRunnable) NeedLeaderElection() bool { return true }
73+
6474
func init() {
6575
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
6676
utilruntime.Must(kservescheme.AddLLMISVCAPIs(scheme))
@@ -199,37 +209,17 @@ func main() {
199209
os.Exit(1)
200210
}
201211

202-
// Register v1alpha2 validators
212+
// Register webhooks: validation (v1alpha1, v1alpha2) and conversion
203213
v1alpha2LLMValidator := &v1alpha2.LLMInferenceServiceValidator{}
204214
if err = v1alpha2LLMValidator.SetupWithManager(mgr); err != nil {
205215
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha2")
206216
os.Exit(1)
207217
}
208-
209-
// Register v1alpha1 validators
210218
v1alpha1LLMValidator := &v1alpha1.LLMInferenceServiceValidator{}
211219
if err = v1alpha1LLMValidator.SetupWithManager(mgr); err != nil {
212220
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha1")
213221
os.Exit(1)
214222
}
215-
216-
setupLog.Info("Setting up LLMInferenceService controller")
217-
llmEventBroadcaster := record.NewBroadcaster()
218-
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
219-
if err = (&llmisvc.LLMISVCReconciler{
220-
Client: mgr.GetClient(),
221-
Config: mgr.GetConfig(),
222-
Clientset: clientSet,
223-
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
224-
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
225-
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
226-
return err
227-
},
228-
}).SetupWithManager(mgr); err != nil {
229-
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
230-
os.Exit(1)
231-
}
232-
233223
v1alpha1ConfigValidator := &v1alpha1.LLMInferenceServiceConfigValidator{
234224
ConfigValidationFunc: createV1Alpha1ConfigValidationFunc(clientSet),
235225
WellKnownConfigChecker: wellKnownConfigChecker,
@@ -238,7 +228,6 @@ func main() {
238228
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha1")
239229
os.Exit(1)
240230
}
241-
242231
v1alpha2ConfigValidator := &v1alpha2.LLMInferenceServiceConfigValidator{
243232
ConfigValidationFunc: createV1Alpha2ConfigValidationFunc(clientSet),
244233
WellKnownConfigChecker: wellKnownConfigChecker,
@@ -247,23 +236,36 @@ func main() {
247236
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha2")
248237
os.Exit(1)
249238
}
250-
251-
// Register conversion webhooks for Hub types (v1alpha2)
252-
// This enables automatic API version conversion between v1alpha1 and v1alpha2
253239
if err = ctrl.NewWebhookManagedBy(mgr).
254240
For(&v1alpha2.LLMInferenceService{}).
255241
Complete(); err != nil {
256242
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceservice")
257243
os.Exit(1)
258244
}
259-
260245
if err = ctrl.NewWebhookManagedBy(mgr).
261246
For(&v1alpha2.LLMInferenceServiceConfig{}).
262247
Complete(); err != nil {
263248
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceserviceconfig")
264249
os.Exit(1)
265250
}
266251

252+
setupLog.Info("Setting up LLMInferenceService controller")
253+
llmEventBroadcaster := record.NewBroadcaster()
254+
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
255+
if err = (&llmisvc.LLMISVCReconciler{
256+
Client: mgr.GetClient(),
257+
Config: mgr.GetConfig(),
258+
Clientset: clientSet,
259+
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
260+
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
261+
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
262+
return err
263+
},
264+
}).SetupWithManager(mgr); err != nil {
265+
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
266+
os.Exit(1)
267+
}
268+
267269
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
268270
setupLog.Error(err, "unable to set up health check")
269271
os.Exit(1)
@@ -273,21 +275,47 @@ func main() {
273275
os.Exit(1)
274276
}
275277

276-
eg := errgroup.Group{}
277-
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
278-
for _, gr := range []schema.GroupResource{
279-
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
280-
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
281-
} {
282-
eg.Go(func() error {
283-
if err := migrator.Migrate(ctx, gr); err != nil {
284-
return fmt.Errorf("failed to migrate %q: %w", gr, err)
285-
}
286-
return nil
287-
})
278+
// Storage version migration runs as a LeaderElection runnable, which starts
279+
// after the webhook server and cache sync are ready. This avoids the
280+
// chicken-and-egg problem where migration patches trigger validating webhooks
281+
// that aren't serving yet.
282+
// migrationBackoff allows enough time for Service endpoints to propagate
283+
// after the webhook server starts.
284+
migrationBackoff := wait.Backoff{
285+
Duration: 2 * time.Second,
286+
Factor: 1.5,
287+
Jitter: 0.1,
288+
Steps: 10,
288289
}
289-
if err := eg.Wait(); err != nil {
290-
setupLog.Error(err, "unable to migrate resources")
290+
if err := mgr.Add(leaderRunnable(func(ctx context.Context) error {
291+
setupLog.Info("running storage version migration")
292+
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
293+
for _, gr := range []schema.GroupResource{
294+
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
295+
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
296+
} {
297+
var lastErr error
298+
if err := wait.ExponentialBackoffWithContext(ctx, migrationBackoff, func(ctx context.Context) (bool, error) {
299+
if err := migrator.Migrate(ctx, gr); err != nil {
300+
lastErr = err
301+
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) || apierrors.IsNotFound(err) {
302+
return false, err
303+
}
304+
setupLog.Error(err, "storage version migration attempt failed, retrying", "resource", gr)
305+
return false, nil
306+
}
307+
return true, nil
308+
}); err != nil {
309+
if lastErr != nil && wait.Interrupted(err) {
310+
return fmt.Errorf("storage version migration for %s timed out: %w", gr, lastErr)
311+
}
312+
return fmt.Errorf("storage version migration for %s failed: %w", gr, err)
313+
}
314+
}
315+
setupLog.Info("storage version migration completed")
316+
return nil
317+
})); err != nil {
318+
setupLog.Error(err, "unable to register storage version migration")
291319
os.Exit(1)
292320
}
293321

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ require (
3636
github.com/tidwall/gjson v1.18.0
3737
go.opentelemetry.io/otel/trace v1.39.0
3838
go.uber.org/zap v1.27.1
39-
golang.org/x/sync v0.19.0
4039
gomodules.xyz/jsonpatch/v2 v2.5.0
4140
google.golang.org/api v0.250.0
4241
google.golang.org/protobuf v1.36.11
@@ -194,6 +193,7 @@ require (
194193
golang.org/x/mod v0.32.0 // indirect
195194
golang.org/x/net v0.49.0 // indirect
196195
golang.org/x/oauth2 v0.34.0 // indirect
196+
golang.org/x/sync v0.19.0 // indirect
197197
golang.org/x/sys v0.40.0 // indirect
198198
golang.org/x/term v0.39.0 // indirect
199199
golang.org/x/text v0.33.0 // indirect

0 commit comments

Comments
 (0)