@@ -22,16 +22,18 @@ import (
2222 "flag"
2323 "fmt"
2424 "os"
25+ "time"
2526
26- "golang.org/x/sync/errgroup"
2727 appsv1 "k8s.io/api/apps/v1"
2828 autoscalingv2 "k8s.io/api/autoscaling/v2"
2929 corev1 "k8s.io/api/core/v1"
3030 apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
31+ apierrors "k8s.io/apimachinery/pkg/api/errors"
3132 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233 "k8s.io/apimachinery/pkg/runtime"
3334 "k8s.io/apimachinery/pkg/runtime/schema"
3435 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36+ "k8s.io/apimachinery/pkg/util/wait"
3537 "k8s.io/client-go/dynamic"
3638 "k8s.io/client-go/kubernetes"
3739 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
6062 setupLog = ctrl .Log .WithName ("setup" )
6163)
6264
65+ // leaderRunnable is a function that implements both Runnable and
66+ // LeaderElectionRunnable, ensuring it only runs on the elected leader
67+ // and starts after webhooks and caches are ready.
68+ type leaderRunnable func (context.Context ) error
69+
70+ func (r leaderRunnable ) Start (ctx context.Context ) error { return r (ctx ) }
71+ func (r leaderRunnable ) NeedLeaderElection () bool { return true }
72+
6373func init () {
6474 utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
6575 utilruntime .Must (kservescheme .AddLLMISVCAPIs (scheme ))
@@ -189,37 +199,17 @@ func main() {
189199 os .Exit (1 )
190200 }
191201
192- // Register v1alpha2 validators
202+ // Register webhooks: validation (v1alpha1, v1alpha2) and conversion
193203 v1alpha2LLMValidator := & v1alpha2.LLMInferenceServiceValidator {}
194204 if err = v1alpha2LLMValidator .SetupWithManager (mgr ); err != nil {
195205 setupLog .Error (err , "unable to create webhook" , "webhook" , "llminferenceservice-v1alpha2" )
196206 os .Exit (1 )
197207 }
198-
199- // Register v1alpha1 validators
200208 v1alpha1LLMValidator := & v1alpha1.LLMInferenceServiceValidator {}
201209 if err = v1alpha1LLMValidator .SetupWithManager (mgr ); err != nil {
202210 setupLog .Error (err , "unable to create webhook" , "webhook" , "llminferenceservice-v1alpha1" )
203211 os .Exit (1 )
204212 }
205-
206- setupLog .Info ("Setting up LLMInferenceService controller" )
207- llmEventBroadcaster := record .NewBroadcaster ()
208- llmEventBroadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : clientSet .CoreV1 ().Events ("" )})
209- if err = (& llmisvc.LLMISVCReconciler {
210- Client : mgr .GetClient (),
211- Config : mgr .GetConfig (),
212- Clientset : clientSet ,
213- EventRecorder : llmEventBroadcaster .NewRecorder (scheme , corev1.EventSource {Component : "LLMInferenceServiceController" }),
214- Validator : func (ctx context.Context , llmSvc * v1alpha2.LLMInferenceService ) error {
215- _ , err := v1alpha2LLMValidator .ValidateCreate (ctx , llmSvc )
216- return err
217- },
218- }).SetupWithManager (mgr ); err != nil {
219- setupLog .Error (err , "unable to create controller" , "controller" , "LLMInferenceService" )
220- os .Exit (1 )
221- }
222-
223213 v1alpha1ConfigValidator := & v1alpha1.LLMInferenceServiceConfigValidator {
224214 ConfigValidationFunc : createV1Alpha1ConfigValidationFunc (clientSet ),
225215 WellKnownConfigChecker : wellKnownConfigChecker ,
@@ -228,7 +218,6 @@ func main() {
228218 setupLog .Error (err , "unable to create webhook" , "webhook" , "llminferenceserviceconfig-v1alpha1" )
229219 os .Exit (1 )
230220 }
231-
232221 v1alpha2ConfigValidator := & v1alpha2.LLMInferenceServiceConfigValidator {
233222 ConfigValidationFunc : createV1Alpha2ConfigValidationFunc (clientSet ),
234223 WellKnownConfigChecker : wellKnownConfigChecker ,
@@ -237,23 +226,36 @@ func main() {
237226 setupLog .Error (err , "unable to create webhook" , "webhook" , "llminferenceserviceconfig-v1alpha2" )
238227 os .Exit (1 )
239228 }
240-
241- // Register conversion webhooks for Hub types (v1alpha2)
242- // This enables automatic API version conversion between v1alpha1 and v1alpha2
243229 if err = ctrl .NewWebhookManagedBy (mgr ).
244230 For (& v1alpha2.LLMInferenceService {}).
245231 Complete (); err != nil {
246232 setupLog .Error (err , "unable to create conversion webhook" , "webhook" , "llminferenceservice" )
247233 os .Exit (1 )
248234 }
249-
250235 if err = ctrl .NewWebhookManagedBy (mgr ).
251236 For (& v1alpha2.LLMInferenceServiceConfig {}).
252237 Complete (); err != nil {
253238 setupLog .Error (err , "unable to create conversion webhook" , "webhook" , "llminferenceserviceconfig" )
254239 os .Exit (1 )
255240 }
256241
242+ setupLog .Info ("Setting up LLMInferenceService controller" )
243+ llmEventBroadcaster := record .NewBroadcaster ()
244+ llmEventBroadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : clientSet .CoreV1 ().Events ("" )})
245+ if err = (& llmisvc.LLMISVCReconciler {
246+ Client : mgr .GetClient (),
247+ Config : mgr .GetConfig (),
248+ Clientset : clientSet ,
249+ EventRecorder : llmEventBroadcaster .NewRecorder (scheme , corev1.EventSource {Component : "LLMInferenceServiceController" }),
250+ Validator : func (ctx context.Context , llmSvc * v1alpha2.LLMInferenceService ) error {
251+ _ , err := v1alpha2LLMValidator .ValidateCreate (ctx , llmSvc )
252+ return err
253+ },
254+ }).SetupWithManager (mgr ); err != nil {
255+ setupLog .Error (err , "unable to create controller" , "controller" , "LLMInferenceService" )
256+ os .Exit (1 )
257+ }
258+
257259 if err := mgr .AddHealthzCheck ("healthz" , healthz .Ping ); err != nil {
258260 setupLog .Error (err , "unable to set up health check" )
259261 os .Exit (1 )
@@ -263,21 +265,47 @@ func main() {
263265 os .Exit (1 )
264266 }
265267
266- eg := errgroup.Group {}
267- migrator := storageversion .NewMigrator (dynamic .NewForConfigOrDie (cfg ), apixclient .NewForConfigOrDie (cfg ))
268- for _ , gr := range []schema.GroupResource {
269- {Group : v1alpha2 .SchemeGroupVersion .Group , Resource : "llminferenceservices" },
270- {Group : v1alpha2 .SchemeGroupVersion .Group , Resource : "llminferenceserviceconfigs" },
271- } {
272- eg .Go (func () error {
273- if err := migrator .Migrate (ctx , gr ); err != nil {
274- return fmt .Errorf ("failed to migrate %q: %w" , gr , err )
275- }
276- return nil
277- })
268+ // Storage version migration runs as a LeaderElection runnable, which starts
269+ // after the webhook server and cache sync are ready. This avoids the
270+ // chicken-and-egg problem where migration patches trigger validating webhooks
271+ // that aren't serving yet.
272+ // migrationBackoff allows enough time for Service endpoints to propagate
273+ // after the webhook server starts.
274+ migrationBackoff := wait.Backoff {
275+ Duration : 2 * time .Second ,
276+ Factor : 1.5 ,
277+ Jitter : 0.1 ,
278+ Steps : 10 ,
278279 }
279- if err := eg .Wait (); err != nil {
280- setupLog .Error (err , "unable to migrate resources" )
280+ if err := mgr .Add (leaderRunnable (func (ctx context.Context ) error {
281+ setupLog .Info ("running storage version migration" )
282+ migrator := storageversion .NewMigrator (dynamic .NewForConfigOrDie (cfg ), apixclient .NewForConfigOrDie (cfg ))
283+ for _ , gr := range []schema.GroupResource {
284+ {Group : v1alpha2 .SchemeGroupVersion .Group , Resource : "llminferenceservices" },
285+ {Group : v1alpha2 .SchemeGroupVersion .Group , Resource : "llminferenceserviceconfigs" },
286+ } {
287+ var lastErr error
288+ if err := wait .ExponentialBackoffWithContext (ctx , migrationBackoff , func (ctx context.Context ) (bool , error ) {
289+ if err := migrator .Migrate (ctx , gr ); err != nil {
290+ lastErr = err
291+ if apierrors .IsForbidden (err ) || apierrors .IsUnauthorized (err ) || apierrors .IsNotFound (err ) {
292+ return false , err
293+ }
294+ setupLog .Error (err , "storage version migration attempt failed, retrying" , "resource" , gr )
295+ return false , nil
296+ }
297+ return true , nil
298+ }); err != nil {
299+ if lastErr != nil && wait .Interrupted (err ) {
300+ return fmt .Errorf ("storage version migration for %s timed out: %w" , gr , lastErr )
301+ }
302+ return fmt .Errorf ("storage version migration for %s failed: %w" , gr , err )
303+ }
304+ }
305+ setupLog .Info ("storage version migration completed" )
306+ return nil
307+ })); err != nil {
308+ setupLog .Error (err , "unable to register storage version migration" )
281309 os .Exit (1 )
282310 }
283311
0 commit comments