Skip to content

Commit 0b49b41

Browse files
committed
fix(llmisvc): defer storage version migration until webhooks are serving
Cherry-pick of opendatahub-io#1251 Fixes [RHOAIENG-54344](https://redhat.atlassian.net/browse/RHOAIENG-54344) Signed-off-by: Bartosz Majsak <bartosz.majsak@gmail.com>
1 parent b14d4f1 commit 0b49b41

File tree

3 files changed

+314
-42
lines changed

3 files changed

+314
-42
lines changed

cmd/llmisvc/main.go

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,19 @@ import (
2222
"flag"
2323
"fmt"
2424
"os"
25+
"time"
2526

26-
"golang.org/x/sync/errgroup"
2727
appsv1 "k8s.io/api/apps/v1"
2828
autoscalingv2 "k8s.io/api/autoscaling/v2"
2929
corev1 "k8s.io/api/core/v1"
3030
apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
31+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/fields"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/runtime/schema"
3536
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37+
"k8s.io/apimachinery/pkg/util/wait"
3638
"k8s.io/client-go/dynamic"
3739
"k8s.io/client-go/kubernetes"
3840
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -61,6 +63,14 @@ var (
6163
setupLog = ctrl.Log.WithName("setup")
6264
)
6365

66+
// leaderRunnable is a function that implements both Runnable and
67+
// LeaderElectionRunnable, ensuring it only runs on the elected leader
68+
// and starts after webhooks and caches are ready.
69+
type leaderRunnable func(context.Context) error
70+
71+
func (r leaderRunnable) Start(ctx context.Context) error { return r(ctx) }
72+
func (r leaderRunnable) NeedLeaderElection() bool { return true }
73+
6474
func init() {
6575
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
6676
utilruntime.Must(kservescheme.AddLLMISVCAPIs(scheme))
@@ -199,37 +209,17 @@ func main() {
199209
os.Exit(1)
200210
}
201211

202-
// Register v1alpha2 validators
212+
// Register webhooks: validation (v1alpha1, v1alpha2) and conversion
203213
v1alpha2LLMValidator := &v1alpha2.LLMInferenceServiceValidator{}
204214
if err = v1alpha2LLMValidator.SetupWithManager(mgr); err != nil {
205215
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha2")
206216
os.Exit(1)
207217
}
208-
209-
// Register v1alpha1 validators
210218
v1alpha1LLMValidator := &v1alpha1.LLMInferenceServiceValidator{}
211219
if err = v1alpha1LLMValidator.SetupWithManager(mgr); err != nil {
212220
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha1")
213221
os.Exit(1)
214222
}
215-
216-
setupLog.Info("Setting up LLMInferenceService controller")
217-
llmEventBroadcaster := record.NewBroadcaster()
218-
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
219-
if err = (&llmisvc.LLMISVCReconciler{
220-
Client: mgr.GetClient(),
221-
Config: mgr.GetConfig(),
222-
Clientset: clientSet,
223-
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
224-
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
225-
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
226-
return err
227-
},
228-
}).SetupWithManager(mgr); err != nil {
229-
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
230-
os.Exit(1)
231-
}
232-
233223
v1alpha1ConfigValidator := &v1alpha1.LLMInferenceServiceConfigValidator{
234224
ConfigValidationFunc: createV1Alpha1ConfigValidationFunc(clientSet),
235225
WellKnownConfigChecker: wellKnownConfigChecker,
@@ -238,7 +228,6 @@ func main() {
238228
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha1")
239229
os.Exit(1)
240230
}
241-
242231
v1alpha2ConfigValidator := &v1alpha2.LLMInferenceServiceConfigValidator{
243232
ConfigValidationFunc: createV1Alpha2ConfigValidationFunc(clientSet),
244233
WellKnownConfigChecker: wellKnownConfigChecker,
@@ -247,23 +236,36 @@ func main() {
247236
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha2")
248237
os.Exit(1)
249238
}
250-
251-
// Register conversion webhooks for Hub types (v1alpha2)
252-
// This enables automatic API version conversion between v1alpha1 and v1alpha2
253239
if err = ctrl.NewWebhookManagedBy(mgr).
254240
For(&v1alpha2.LLMInferenceService{}).
255241
Complete(); err != nil {
256242
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceservice")
257243
os.Exit(1)
258244
}
259-
260245
if err = ctrl.NewWebhookManagedBy(mgr).
261246
For(&v1alpha2.LLMInferenceServiceConfig{}).
262247
Complete(); err != nil {
263248
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceserviceconfig")
264249
os.Exit(1)
265250
}
266251

252+
setupLog.Info("Setting up LLMInferenceService controller")
253+
llmEventBroadcaster := record.NewBroadcaster()
254+
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
255+
if err = (&llmisvc.LLMISVCReconciler{
256+
Client: mgr.GetClient(),
257+
Config: mgr.GetConfig(),
258+
Clientset: clientSet,
259+
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
260+
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
261+
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
262+
return err
263+
},
264+
}).SetupWithManager(mgr); err != nil {
265+
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
266+
os.Exit(1)
267+
}
268+
267269
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
268270
setupLog.Error(err, "unable to set up health check")
269271
os.Exit(1)
@@ -273,21 +275,47 @@ func main() {
273275
os.Exit(1)
274276
}
275277

276-
eg := errgroup.Group{}
277-
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
278-
for _, gr := range []schema.GroupResource{
279-
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
280-
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
281-
} {
282-
eg.Go(func() error {
283-
if err := migrator.Migrate(ctx, gr); err != nil {
284-
return fmt.Errorf("failed to migrate %q: %w", gr, err)
285-
}
286-
return nil
287-
})
278+
// Storage version migration runs as a LeaderElection runnable, which starts
279+
// after the webhook server and cache sync are ready. This avoids the
280+
// chicken-and-egg problem where migration patches trigger validating webhooks
281+
// that aren't serving yet.
282+
// migrationBackoff allows enough time for Service endpoints to propagate
283+
// after the webhook server starts.
284+
migrationBackoff := wait.Backoff{
285+
Duration: 2 * time.Second,
286+
Factor: 1.5,
287+
Jitter: 0.1,
288+
Steps: 10,
288289
}
289-
if err := eg.Wait(); err != nil {
290-
setupLog.Error(err, "unable to migrate resources")
290+
if err := mgr.Add(leaderRunnable(func(ctx context.Context) error {
291+
setupLog.Info("running storage version migration")
292+
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
293+
for _, gr := range []schema.GroupResource{
294+
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
295+
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
296+
} {
297+
var lastErr error
298+
if err := wait.ExponentialBackoffWithContext(ctx, migrationBackoff, func(ctx context.Context) (bool, error) {
299+
if err := migrator.Migrate(ctx, gr); err != nil {
300+
lastErr = err
301+
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) || apierrors.IsNotFound(err) {
302+
return false, err
303+
}
304+
setupLog.Error(err, "storage version migration attempt failed, retrying", "resource", gr)
305+
return false, nil
306+
}
307+
return true, nil
308+
}); err != nil {
309+
if lastErr != nil && wait.Interrupted(err) {
310+
return fmt.Errorf("storage version migration for %s timed out: %w", gr, lastErr)
311+
}
312+
return fmt.Errorf("storage version migration for %s failed: %w", gr, err)
313+
}
314+
}
315+
setupLog.Info("storage version migration completed")
316+
return nil
317+
})); err != nil {
318+
setupLog.Error(err, "unable to register storage version migration")
291319
os.Exit(1)
292320
}
293321

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ require (
3636
github.com/tidwall/gjson v1.18.0
3737
go.opentelemetry.io/otel/trace v1.39.0
3838
go.uber.org/zap v1.27.1
39-
golang.org/x/sync v0.19.0
4039
gomodules.xyz/jsonpatch/v2 v2.5.0
4140
google.golang.org/api v0.250.0
4241
google.golang.org/protobuf v1.36.11
@@ -194,6 +193,7 @@ require (
194193
golang.org/x/mod v0.32.0 // indirect
195194
golang.org/x/net v0.49.0 // indirect
196195
golang.org/x/oauth2 v0.34.0 // indirect
196+
golang.org/x/sync v0.19.0 // indirect
197197
golang.org/x/sys v0.40.0 // indirect
198198
golang.org/x/term v0.39.0 // indirect
199199
golang.org/x/text v0.33.0 // indirect

0 commit comments

Comments
 (0)