Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions cmd/llmisvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
"flag"
"fmt"
"os"
"time"

"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
apixclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -61,6 +63,14 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

// leaderRunnable is a function that implements both Runnable and
// LeaderElectionRunnable, ensuring it only runs on the elected leader
// and starts after webhooks and caches are ready.
type leaderRunnable func(context.Context) error

func (r leaderRunnable) Start(ctx context.Context) error { return r(ctx) }
func (r leaderRunnable) NeedLeaderElection() bool { return true }

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kservescheme.AddLLMISVCAPIs(scheme))
Expand Down Expand Up @@ -199,37 +209,17 @@ func main() {
os.Exit(1)
}

// Register v1alpha2 validators
// Register webhooks: validation (v1alpha1, v1alpha2) and conversion
v1alpha2LLMValidator := &v1alpha2.LLMInferenceServiceValidator{}
if err = v1alpha2LLMValidator.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha2")
os.Exit(1)
}

// Register v1alpha1 validators
v1alpha1LLMValidator := &v1alpha1.LLMInferenceServiceValidator{}
if err = v1alpha1LLMValidator.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceservice-v1alpha1")
os.Exit(1)
}

setupLog.Info("Setting up LLMInferenceService controller")
llmEventBroadcaster := record.NewBroadcaster()
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
if err = (&llmisvc.LLMISVCReconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Clientset: clientSet,
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
return err
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
os.Exit(1)
}

v1alpha1ConfigValidator := &v1alpha1.LLMInferenceServiceConfigValidator{
ConfigValidationFunc: createV1Alpha1ConfigValidationFunc(clientSet),
WellKnownConfigChecker: wellKnownConfigChecker,
Expand All @@ -238,7 +228,6 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha1")
os.Exit(1)
}

v1alpha2ConfigValidator := &v1alpha2.LLMInferenceServiceConfigValidator{
ConfigValidationFunc: createV1Alpha2ConfigValidationFunc(clientSet),
WellKnownConfigChecker: wellKnownConfigChecker,
Expand All @@ -247,23 +236,36 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "llminferenceserviceconfig-v1alpha2")
os.Exit(1)
}

// Register conversion webhooks for Hub types (v1alpha2)
// This enables automatic API version conversion between v1alpha1 and v1alpha2
if err = ctrl.NewWebhookManagedBy(mgr).
For(&v1alpha2.LLMInferenceService{}).
Complete(); err != nil {
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceservice")
os.Exit(1)
}

if err = ctrl.NewWebhookManagedBy(mgr).
For(&v1alpha2.LLMInferenceServiceConfig{}).
Complete(); err != nil {
setupLog.Error(err, "unable to create conversion webhook", "webhook", "llminferenceserviceconfig")
os.Exit(1)
}

setupLog.Info("Setting up LLMInferenceService controller")
llmEventBroadcaster := record.NewBroadcaster()
llmEventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
if err = (&llmisvc.LLMISVCReconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Clientset: clientSet,
EventRecorder: llmEventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "LLMInferenceServiceController"}),
Validator: func(ctx context.Context, llmSvc *v1alpha2.LLMInferenceService) error {
_, err := v1alpha2LLMValidator.ValidateCreate(ctx, llmSvc)
return err
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LLMInferenceService")
os.Exit(1)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
Expand All @@ -273,21 +275,47 @@ func main() {
os.Exit(1)
}

eg := errgroup.Group{}
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
for _, gr := range []schema.GroupResource{
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
} {
eg.Go(func() error {
if err := migrator.Migrate(ctx, gr); err != nil {
return fmt.Errorf("failed to migrate %q: %w", gr, err)
}
return nil
})
// Storage version migration runs as a LeaderElection runnable, which starts
// after the webhook server and cache sync are ready. This avoids the
// chicken-and-egg problem where migration patches trigger validating webhooks
// that aren't serving yet.
// migrationBackoff allows enough time for Service endpoints to propagate
// after the webhook server starts.
migrationBackoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.1,
Steps: 10,
}
if err := eg.Wait(); err != nil {
setupLog.Error(err, "unable to migrate resources")
if err := mgr.Add(leaderRunnable(func(ctx context.Context) error {
setupLog.Info("running storage version migration")
migrator := storageversion.NewMigrator(dynamic.NewForConfigOrDie(cfg), apixclient.NewForConfigOrDie(cfg))
for _, gr := range []schema.GroupResource{
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceservices"},
{Group: v1alpha2.SchemeGroupVersion.Group, Resource: "llminferenceserviceconfigs"},
} {
var lastErr error
if err := wait.ExponentialBackoffWithContext(ctx, migrationBackoff, func(ctx context.Context) (bool, error) {
if err := migrator.Migrate(ctx, gr); err != nil {
lastErr = err
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) || apierrors.IsNotFound(err) {
return false, err
}
setupLog.Error(err, "storage version migration attempt failed, retrying", "resource", gr)
return false, nil
}
return true, nil
}); err != nil {
if lastErr != nil && wait.Interrupted(err) {
return fmt.Errorf("storage version migration for %s timed out: %w", gr, lastErr)
}
return fmt.Errorf("storage version migration for %s failed: %w", gr, err)
}
}
setupLog.Info("storage version migration completed")
return nil
})); err != nil {
setupLog.Error(err, "unable to register storage version migration")
os.Exit(1)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ require (
github.com/tidwall/gjson v1.18.0
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/zap v1.27.1
golang.org/x/sync v0.19.0
gomodules.xyz/jsonpatch/v2 v2.5.0
google.golang.org/api v0.250.0
google.golang.org/protobuf v1.36.11
Expand Down Expand Up @@ -194,6 +193,7 @@ require (
golang.org/x/mod v0.32.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/term v0.39.0 // indirect
golang.org/x/text v0.33.0 // indirect
Expand Down
Loading
Loading