From 78ae99611511cf19d0f28de6d485bf54ef9fd287 Mon Sep 17 00:00:00 2001 From: alinsran Date: Wed, 23 Jul 2025 08:20:31 +0800 Subject: [PATCH 1/5] fix: full sync during restart results in loss of dataplane traffic --- .../controller/apisixconsumer_controller.go | 3 + .../controller/apisixglobalrule_controller.go | 4 + internal/controller/apisixroute_controller.go | 3 + internal/controller/apisixtls_controller.go | 3 + internal/controller/consumer_controller.go | 3 + internal/controller/httproute_controller.go | 3 + internal/controller/indexer/indexer.go | 22 ++- internal/controller/ingress_controller.go | 3 + internal/controller/status/updater.go | 4 + internal/manager/controllers.go | 103 +++++++++- internal/manager/readiness/manager.go | 177 ++++++++++++++++++ internal/manager/run.go | 12 +- internal/provider/adc/adc.go | 14 +- internal/provider/provider.go | 1 + internal/types/k8s.go | 76 ++++++++ 15 files changed, 421 insertions(+), 10 deletions(-) create mode 100644 internal/manager/readiness/manager.go diff --git a/internal/controller/apisixconsumer_controller.go b/internal/controller/apisixconsumer_controller.go index eb9b3c7323..af82552d5e 100644 --- a/internal/controller/apisixconsumer_controller.go +++ b/internal/controller/apisixconsumer_controller.go @@ -39,6 +39,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,11 +52,13 @@ type ApisixConsumerReconciler struct { Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) func (r *ApisixConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.Log.Info("reconcile", "request", req.NamespacedName) + r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName) ac := &apiv2.ApisixConsumer{} if err := r.Get(ctx, req.NamespacedName, ac); err != nil { diff --git a/internal/controller/apisixglobalrule_controller.go b/internal/controller/apisixglobalrule_controller.go index fc42b1fb2e..c41249faa4 100644 --- a/internal/controller/apisixglobalrule_controller.go +++ b/internal/controller/apisixglobalrule_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -49,10 +50,13 @@ type ApisixGlobalRuleReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + + Readier readiness.ReadinessManager } // Reconcile implements the reconciliation logic for ApisixGlobalRule func (r *ApisixGlobalRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName) var globalRule apiv2.ApisixGlobalRule if err := r.Get(ctx, req.NamespacedName, &globalRule); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index a07c6e76f1..7a89e01365 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -44,6 +44,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -57,6 +58,7 @@ type ApisixRouteReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -97,6 +99,7 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName) var ar apiv2.ApisixRoute if err := r.Get(ctx, req.NamespacedName, &ar); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixtls_controller.go b/internal/controller/apisixtls_controller.go index 5cde26f6c1..4845b5d664 100644 --- a/internal/controller/apisixtls_controller.go +++ b/internal/controller/apisixtls_controller.go @@ -39,6 +39,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -50,6 +51,7 @@ type ApisixTlsReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -85,6 +87,7 @@ func (r *ApisixTlsReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile implements the reconciliation logic for ApisixTls func (r *ApisixTlsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName) var tls apiv2.ApisixTls if err := r.Get(ctx, req.NamespacedName, &tls); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index fd51f87a11..9df147a66e 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,6 +52,7 @@ type ConsumerReconciler struct { //nolint:revive Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -181,6 +183,7 @@ func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, o } func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName) consumer := new(v1alpha1.Consumer) if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 50b6e3b2ba..e1da1f38ad 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -48,6 +48,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -65,6 +66,7 @@ type HTTPRouteReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -130,6 +132,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName) hr := new(gatewayv1.HTTPRoute) if err := r.Get(ctx, req.NamespacedName, hr); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 2f1b75742e..e621ac1092 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -47,6 +47,7 @@ const ( GatewayClassIndexRef = "gatewayClassRef" ApisixUpstreamRef = "apisixUpstreamRef" PluginConfigIndexRef = "pluginConfigRefs" + ControllerName = "controllerName" ) func SetupIndexer(mgr ctrl.Manager) error { @@ -59,11 +60,11 @@ func SetupIndexer(mgr ctrl.Manager) error { setupIngressClassIndexer, setupGatewayProxyIndexer, setupGatewaySecretIndex, - setupGatewayClassIndexer, setupApisixRouteIndexer, setupApisixPluginConfigIndexer, setupApisixTlsIndexer, setupApisixConsumerIndexer, + setupGatewayClassIndexer, } { if err := setup(mgr); err != nil { return err @@ -81,6 +82,17 @@ func setupGatewayIndexer(mgr ctrl.Manager) error { ); err != nil { return err } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.Gateway{}, + GatewayClassIndexRef, + func(obj client.Object) (requests []string) { + return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + }, + ); err != nil { + return err + } return nil } @@ -273,10 +285,10 @@ func setupGatewaySecretIndex(mgr ctrl.Manager) error { func setupGatewayClassIndexer(mgr ctrl.Manager) error { return mgr.GetFieldIndexer().IndexField( context.Background(), - &gatewayv1.Gateway{}, - GatewayClassIndexRef, - func(obj client.Object) (requests []string) { - return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + &gatewayv1.GatewayClass{}, + ControllerName, + func(obj client.Object) []string { + return []string{string(obj.(*gatewayv1.GatewayClass).Spec.ControllerName)} }, ) } diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 86c4d7ff0e..9770d050fe 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -45,6 +45,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -59,6 +60,7 @@ type IngressReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -117,6 +119,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile handles the reconciliation of Ingress resources func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName) ingress := new(networkingv1.Ingress) if err := r.Get(ctx, req.NamespacedName, ingress); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/status/updater.go b/internal/controller/status/updater.go index 019c184685..d00aec76a8 100644 --- a/internal/controller/status/updater.go +++ b/internal/controller/status/updater.go @@ -150,6 +150,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { } } +func (u *UpdateHandler) NeedsLeaderElection() bool { + return true +} + func (u *UpdateHandler) Writer() Updater { return &UpdateWriter{ updateChannel: u.updateChannel, diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 2af8cd6aaa..e93b335540 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -20,13 +20,24 @@ package manager import ( "context" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller" + "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" + types "github.com/apache/apisix-ingress-controller/internal/types" ) // K8s @@ -83,7 +94,7 @@ type Controller interface { SetupWithManager(mgr manager.Manager) error } -func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater) ([]Controller, error) { +func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater, readier readiness.ReadinessManager) ([]Controller, error) { if err := indexer.SetupIndexer(mgr); err != nil { return nil, err } @@ -107,6 +118,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("HTTPRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressReconciler{ Client: mgr.GetClient(), @@ -121,6 +133,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Consumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressClassReconciler{ Client: mgr.GetClient(), @@ -134,6 +147,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixGlobalRule"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixRouteReconciler{ Client: mgr.GetClient(), @@ -141,6 +155,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixConsumerReconciler{ Client: mgr.GetClient(), @@ -148,6 +163,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixConsumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixPluginConfigReconciler{ Client: mgr.GetClient(), @@ -161,6 +177,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixTls"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixUpstreamReconciler{ Client: mgr.GetClient(), @@ -176,3 +193,87 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro }, }, nil } + +func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { + readier.RegisterGVK([]readiness.GVKConfig{ + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&gatewayv1.HTTPRoute{}), + }, + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&netv1.Ingress{}), + types.GvkOf(&apiv2.ApisixRoute{}), + types.GvkOf(&apiv2.ApisixGlobalRule{}), + types.GvkOf(&apiv2.ApisixPluginConfig{}), + types.GvkOf(&apiv2.ApisixTls{}), + types.GvkOf(&apiv2.ApisixConsumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + icName, _, _ := unstructured.NestedString(obj.Object, "spec", "ingressClassName") + if icName == "" { + insList := &netv1.IngressClassList{} + if err := c.List(context.Background(), insList, client.MatchingFields{ + indexer.IngressClass: config.GetControllerName(), + }); err != nil { + return false + } + for _, ic := range insList.Items { + if ic.Annotations[types.DefaultIngressClassAnnotation] == "true" { + return true + } + } + return false + } else { + var ingressClass netv1.IngressClass + if err := c.Get(context.Background(), client.ObjectKey{ + Name: icName, + }, &ingressClass); err != nil { + return false + } + return true + } + }), + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&v1alpha1.Consumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + consumer := &v1alpha1.Consumer{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, consumer); err != nil { + return false + } + + if consumer.Spec.GatewayRef.Name == "" { + return false + } + if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != types.KindGateway { + return false + } + if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { + return false + } + + ns := consumer.GetNamespace() + if consumer.Spec.GatewayRef.Namespace != nil { + ns = *consumer.Spec.GatewayRef.Namespace + } + + ctx := context.Background() + + gateway := &gatewayv1.Gateway{} + if err := c.Get(ctx, client.ObjectKey{Name: consumer.Spec.GatewayRef.Name, Namespace: ns}, gateway); err != nil { + return false + } + + gatewayClass := &gatewayv1.GatewayClass{} + if err := c.Get(ctx, client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { + return false + } + return string(gatewayClass.Spec.ControllerName) == config.GetControllerName() + }), + }, + }...) +} diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go new file mode 100644 index 0000000000..2f565db335 --- /dev/null +++ b/internal/manager/readiness/manager.go @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readiness + +import ( + "context" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + types "github.com/apache/apisix-ingress-controller/internal/types" + "github.com/api7/gopkg/pkg/log" +) + +type Filter interface { + Match(obj *unstructured.Unstructured) bool +} + +type GVKFilter func(obj *unstructured.Unstructured) bool + +func (f GVKFilter) Match(obj *unstructured.Unstructured) bool { + return f(obj) +} + +type GVKConfig struct { + GVKs []schema.GroupVersionKind + Filter Filter +} + +type ReadinessManager interface { + RegisterGVK(configs ...GVKConfig) + Start(ctx context.Context) error + IsReady() bool + WaitReady(ctx context.Context, timeout time.Duration) bool + Done(obj client.Object, namespacedName k8stypes.NamespacedName) +} + +type readinessManager struct { + client client.Client + configs []GVKConfig + state map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{} + mu sync.RWMutex + startOnce sync.Once + started chan struct{} + done chan struct{} + + isReady bool +} + +func NewReadinessManager(client client.Client) ReadinessManager { + return &readinessManager{ + client: client, + state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), + started: make(chan struct{}), + done: make(chan struct{}), + } +} + +func (r *readinessManager) RegisterGVK(configs ...GVKConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs = append(r.configs, configs...) +} + +// Start should be called **after** cache is started and synced. +func (r *readinessManager) Start(ctx context.Context) error { + var err error + r.startOnce.Do(func() { + for _, cfg := range r.configs { + for _, gvk := range cfg.GVKs { + uList := &unstructured.UnstructuredList{} + uList.SetGroupVersionKind(gvk) + if listErr := r.client.List(ctx, uList); listErr != nil { + err = fmt.Errorf("list %s failed: %w", gvk.String(), listErr) + return + } + var expected []k8stypes.NamespacedName + for _, item := range uList.Items { + if cfg.Filter != nil && !cfg.Filter.Match(&item) { + continue + } + expected = append(expected, k8stypes.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + }) + } + if len(expected) > 0 { + log.Warnw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) + r.registerState(gvk, expected) + } + } + } + close(r.started) + if len(r.state) == 0 && !r.isReady { + r.isReady = true + close(r.done) + } + }) + return err +} + +func (r *readinessManager) registerState(gvk schema.GroupVersionKind, list []k8stypes.NamespacedName) { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.state[gvk]; !ok { + r.state[gvk] = make(map[k8stypes.NamespacedName]struct{}) + } + for _, name := range list { + r.state[gvk][name] = struct{}{} + } +} + +func (r *readinessManager) Done(obj client.Object, nn k8stypes.NamespacedName) { + if r.IsReady() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + gvk := types.GvkOf(obj) + if _, ok := r.state[gvk]; !ok { + return + } + delete(r.state[gvk], nn) + if len(r.state[gvk]) == 0 { + delete(r.state, gvk) + } + if len(r.state) == 0 && !r.isReady { + r.isReady = true + close(r.done) + } +} + +func (r *readinessManager) IsReady() bool { + return r.isReady +} + +func (r *readinessManager) WaitReady(ctx context.Context, timeout time.Duration) bool { + if r.IsReady() { + return true + } + + select { + case <-r.started: + case <-ctx.Done(): + return false + } + + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return true + case <-r.done: + return true + } +} diff --git a/internal/manager/run.go b/internal/manager/run.go index a27b273251..b75b383d87 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -41,6 +41,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider/adc" _ "github.com/apache/apisix-ingress-controller/pkg/metrics" ) @@ -150,13 +151,20 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } + readier := readiness.NewReadinessManager(mgr.GetClient()) + registerReadinessGVK(mgr.GetClient(), readier) + + if err := mgr.Add(readier); err != nil { + setupLog.Error(err, "unable to add readiness manager") + } + updater := status.NewStatusUpdateHandler(ctrl.LoggerFrom(ctx).WithName("status").WithName("updater"), mgr.GetClient()) if err := mgr.Add(updater); err != nil { setupLog.Error(err, "unable to add status updater") return err } - provider, err := adc.New(updater.Writer(), &adc.Options{ + provider, err := adc.New(updater.Writer(), readier, &adc.Options{ SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration, SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration, InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration, @@ -184,7 +192,7 @@ func Run(ctx context.Context, logger logr.Logger) error { controller.SetEnableReferenceGrant(err == nil) setupLog.Info("setting up controllers") - controllers, err := setupControllers(ctx, mgr, provider, updater.Writer()) + controllers, err := setupControllers(ctx, mgr, provider, updater.Writer(), readier) if err != nil { setupLog.Error(err, "unable to set up controllers") return err diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index f4f220f458..2f56c3a662 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -38,6 +38,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/provider/adc/translator" "github.com/apache/apisix-ingress-controller/internal/types" @@ -92,6 +93,8 @@ type adcClient struct { updater status.Updater statusUpdateMap map[types.NamespacedNameKind][]string + + readier readiness.ReadinessManager } type Task struct { @@ -102,7 +105,7 @@ type Task struct { configs []adcConfig } -func New(updater status.Updater, opts ...Option) (provider.Provider, error) { +func New(updater status.Updater, readier readiness.ReadinessManager, opts ...Option) (provider.Provider, error) { o := Options{} o.ApplyOptions(opts) @@ -114,6 +117,7 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) { store: NewStore(), executor: &DefaultADCExecutor{}, updater: updater, + readier: readier, }, nil } @@ -217,7 +221,7 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext, // This mode is full synchronization, // which only needs to be saved in cache // and triggered by a timer for synchronization - if d.BackendMode == BackendModeAPISIXStandalone || d.BackendMode == BackendModeAPISIX || apiv2.Is(obj) { + if d.BackendMode == BackendModeAPISIXStandalone || d.BackendMode == BackendModeAPISIX { return nil } @@ -296,6 +300,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { } func (d *adcClient) Start(ctx context.Context) error { + d.readier.WaitReady(ctx, 5*time.Minute) + initalSyncDelay := d.InitSyncDelay time.AfterFunc(initalSyncDelay, func() { if err := d.Sync(ctx); err != nil { @@ -457,3 +463,7 @@ func (d *adcClient) handleADCExecutionErrors(statusesMap map[string]types.ADCExe statusUpdateMap := d.resolveADCExecutionErrors(statusesMap) d.handleStatusUpdate(statusUpdateMap) } + +func (d *adcClient) NeedLeaderElection() bool { + return true +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index b06899fbe9..d9b6cbfa40 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -37,6 +37,7 @@ type Provider interface { Delete(context.Context, client.Object) error Sync(context.Context) error Start(context.Context) error + NeedLeaderElection() bool } type TranslateContext struct { diff --git a/internal/types/k8s.go b/internal/types/k8s.go index a115562e25..37657b6227 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -21,7 +21,9 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" v2 "github.com/apache/apisix-ingress-controller/api/v2" corev1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" v1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/runtime/schema" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" ) @@ -90,3 +92,77 @@ func KindOf(obj any) string { return "Unknown" } } + +func GvkOf(obj any) schema.GroupVersionKind { + kind := KindOf(obj) + switch obj.(type) { + case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass: + return gatewayv1.SchemeGroupVersion.WithKind(kind) + case *netv1.Ingress, *netv1.IngressClass: + return netv1.SchemeGroupVersion.WithKind(kind) + case *corev1.Secret, *corev1.Service: + return corev1.SchemeGroupVersion.WithKind(kind) + case *v2.ApisixRoute: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixRoute, + } + case *v2.ApisixGlobalRule: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixGlobalRule, + } + case *v2.ApisixPluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixPluginConfig, + } + case *v2.ApisixTls: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixTls, + } + case *v2.ApisixConsumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixConsumer, + } + case *v1alpha1.HTTPRoutePolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindHTTPRoutePolicy, + } + case *v1alpha1.BackendTrafficPolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindBackendTrafficPolicy, + } + case *v1alpha1.GatewayProxy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindGatewayProxy, + } + case *v1alpha1.Consumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindConsumer, + } + case *v1alpha1.PluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindPluginConfig, + } + default: + return schema.GroupVersionKind{} + } +} From 51c6d2108acf993f065c85aa344e6a3b29db5836 Mon Sep 17 00:00:00 2001 From: alinsran Date: Wed, 23 Jul 2025 11:05:23 +0800 Subject: [PATCH 2/5] fix ingress --- internal/manager/controllers.go | 1 + internal/types/k8s.go | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index e93b335540..b5b02c4e23 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -126,6 +126,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Ingress"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ConsumerReconciler{ Client: mgr.GetClient(), diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 37657b6227..d827b9a5dc 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -22,7 +22,6 @@ import ( v2 "github.com/apache/apisix-ingress-controller/api/v2" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" - v1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime/schema" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" ) @@ -58,9 +57,9 @@ func KindOf(obj any) string { return KindHTTPRoute case *gatewayv1.GatewayClass: return KindGatewayClass - case *v1.Ingress: + case *netv1.Ingress: return KindIngress - case *v1.IngressClass: + case *netv1.IngressClass: return KindIngressClass case *corev1.Secret: return KindSecret From 6de393b22656134a8ee5d73768f3048a3d19f6ed Mon Sep 17 00:00:00 2001 From: AlinsRan Date: Wed, 23 Jul 2025 06:22:40 +0000 Subject: [PATCH 3/5] add test --- internal/manager/readiness/manager.go | 19 ++++- test/e2e/apisix/route.go | 107 ++++++++++++++++++++++++++ test/e2e/crds/consumer.go | 86 +++++++++++++++++++++ test/e2e/gatewayapi/httproute.go | 81 +++++++++++++++++++ 4 files changed, 292 insertions(+), 1 deletion(-) diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go index 2f565db335..af33dc22fc 100644 --- a/internal/manager/readiness/manager.go +++ b/internal/manager/readiness/manager.go @@ -33,21 +33,33 @@ import ( "github.com/api7/gopkg/pkg/log" ) +// Filter defines an interface to match unstructured Kubernetes objects. type Filter interface { Match(obj *unstructured.Unstructured) bool } +// GVKFilter is a functional implementation of Filter using a function type. type GVKFilter func(obj *unstructured.Unstructured) bool func (f GVKFilter) Match(obj *unstructured.Unstructured) bool { return f(obj) } +// GVKConfig defines a set of GVKs and an optional filter to match the objects. type GVKConfig struct { GVKs []schema.GroupVersionKind Filter Filter } +// readinessManager prevents premature full sync to the data plane on controller startup. +// +// Background: +// On startup, the controller watches CRDs and periodically performs full sync to the data plane. +// If a sync occurs before all resources have been reconciled, it may push incomplete data, +// causing traffic disruption. +// +// This manager tracks whether all relevant resources have been processed at least once. +// It is used to delay full sync until initial reconciliation is complete. type ReadinessManager interface { RegisterGVK(configs ...GVKConfig) Start(ctx context.Context) error @@ -68,6 +80,7 @@ type readinessManager struct { isReady bool } +// ReadinessManager tracks readiness of specific resources across the cluster. func NewReadinessManager(client client.Client) ReadinessManager { return &readinessManager{ client: client, @@ -77,13 +90,15 @@ func NewReadinessManager(client client.Client) ReadinessManager { } } +// RegisterGVK registers one or more GVKConfig objects for readiness tracking. func (r *readinessManager) RegisterGVK(configs ...GVKConfig) { r.mu.Lock() defer r.mu.Unlock() r.configs = append(r.configs, configs...) } -// Start should be called **after** cache is started and synced. +// Start initializes the readiness state from the Kubernetes API. +// Should be called only after informer cache has synced. func (r *readinessManager) Start(ctx context.Context) error { var err error r.startOnce.Do(func() { @@ -131,6 +146,7 @@ func (r *readinessManager) registerState(gvk schema.GroupVersionKind, list []k8s } } +// Done marks the resource as ready by removing it from the pending state. func (r *readinessManager) Done(obj client.Object, nn k8stypes.NamespacedName) { if r.IsReady() { return @@ -155,6 +171,7 @@ func (r *readinessManager) IsReady() bool { return r.isReady } +// WaitReady blocks until readiness is achieved, a timeout occurs, or context is cancelled. func (r *readinessManager) WaitReady(ctx context.Context, timeout time.Duration) bool { if r.IsReady() { return true diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index c921a17ebe..8fb9213e74 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -635,4 +635,111 @@ spec: Expect(err).ShouldNot(HaveOccurred(), "check apisixupstream is referenced") }) }) + + Context("Test ApisixRoute sync during startup", func() { + const route = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + + const route2 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route2 +spec: + ingressClassName: apisix-nonexistent + http: + - name: rule0 + match: + hosts: + - httpbin2 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + const route3 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route3 +spec: + http: + - name: rule0 + match: + hosts: + - httpbin3 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + It("Should sync ApisixRoute during startup", func() { + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "apply ApisixRoute with nonexistent ingressClassName") + Expect(s.CreateResourceFromString(route3)).ShouldNot(HaveOccurred(), "apply ApisixRoute without ingressClassName") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, route) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + }) }) diff --git a/test/e2e/crds/consumer.go b/test/e2e/crds/consumer.go index 26b8554f51..a3ee63458d 100644 --- a/test/e2e/crds/consumer.go +++ b/test/e2e/crds/consumer.go @@ -579,4 +579,90 @@ spec: }) }) }) + + Context("Test Consumer sync during startup", func() { + var consumer1 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-sample +spec: + gatewayRef: + name: apisix + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key +` + var consumer2 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-unused +spec: + gatewayRef: + name: apisix-non-existent + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key2 +` + + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayProxy, defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) + + It("Should sync Consumer during startup", func() { + Expect(s.CreateResourceFromString(consumer2)).NotTo(HaveOccurred(), "creating unused consumer") + s.ResourceApplied("Consumer", "consumer-sample", consumer1, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + + By("restarting the controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + }) + }) }) diff --git a/test/e2e/gatewayapi/httproute.go b/test/e2e/gatewayapi/httproute.go index daba664ee4..f4012aa15a 100644 --- a/test/e2e/gatewayapi/httproute.go +++ b/test/e2e/gatewayapi/httproute.go @@ -1900,4 +1900,85 @@ spec: } }) }) + + Context("Test HTTPRoute sync during startup", func() { + BeforeEach(beforeEachHTTP) + var route = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: apisix + hostnames: + - httpbin + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + + var route2 = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin2 +spec: + parentRefs: + - name: apisix-nonexistent + hostnames: + - httpbin2 + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + It("Should sync ApisixRoute during startup", func() { + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "applying HTTPRoute with non-existent parent") + s.ResourceApplied("HTTPRoute", "httpbin", route, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + + }) }) From 43234effb87f1db4cbf1c4c5c2d072b0ba692e7f Mon Sep 17 00:00:00 2001 From: AlinsRan Date: Wed, 23 Jul 2025 08:08:47 +0000 Subject: [PATCH 4/5] update --- internal/controller/consumer_controller.go | 28 +---------- internal/controller/utils.go | 35 ++++++++++++++ internal/manager/controllers.go | 55 ++-------------------- internal/manager/readiness/manager.go | 4 +- internal/types/k8s.go | 5 +- 5 files changed, 45 insertions(+), 82 deletions(-) diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 9df147a66e..07e7ec4bb6 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -306,31 +306,5 @@ func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool { if !ok { return false } - if consumer.Spec.GatewayRef.Name == "" { - return false - } - if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { - return false - } - if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { - return false - } - ns := consumer.GetNamespace() - if consumer.Spec.GatewayRef.Namespace != nil { - ns = *consumer.Spec.GatewayRef.Namespace - } - gateway := &gatewayv1.Gateway{} - if err := r.Get(context.Background(), client.ObjectKey{ - Name: consumer.Spec.GatewayRef.Name, - Namespace: ns, - }, gateway); err != nil { - r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) - return false - } - gatewayClass := &gatewayv1.GatewayClass{} - if err := r.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { - r.Log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) - return false - } - return matchesController(string(gatewayClass.Spec.ControllerName)) + return MatchConsumerGatewayRef(context.Background(), r.Client, r.Log, consumer) } diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 81a44843f3..0917cf815d 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -35,6 +35,7 @@ import ( corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -1448,3 +1449,37 @@ func TypePredicate[T client.Object]() func(obj client.Object) bool { return ok } } + +func MatchConsumerGatewayRef(ctx context.Context, c client.Client, log logr.Logger, consumer *v1alpha1.Consumer) bool { + if consumer.Spec.GatewayRef.Name == "" { + return false + } + if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { + return false + } + if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { + return false + } + ns := consumer.GetNamespace() + if consumer.Spec.GatewayRef.Namespace != nil { + ns = *consumer.Spec.GatewayRef.Namespace + } + gateway := &gatewayv1.Gateway{} + if err := c.Get(context.Background(), client.ObjectKey{ + Name: consumer.Spec.GatewayRef.Name, + Namespace: ns, + }, gateway); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) + } + return false + } + gatewayClass := &gatewayv1.GatewayClass{} + if err := c.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) + } + return false + } + return matchesController(string(gatewayClass.Spec.ControllerName)) +} diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index b5b02c4e23..0688055d4e 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -32,7 +32,6 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller" - "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/manager/readiness" @@ -196,6 +195,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro } func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { + log := ctrl.LoggerFrom(context.Background()).WithName("readiness") readier.RegisterGVK([]readiness.GVKConfig{ { GVKs: []schema.GroupVersionKind{ @@ -213,28 +213,8 @@ func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { }, Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { icName, _, _ := unstructured.NestedString(obj.Object, "spec", "ingressClassName") - if icName == "" { - insList := &netv1.IngressClassList{} - if err := c.List(context.Background(), insList, client.MatchingFields{ - indexer.IngressClass: config.GetControllerName(), - }); err != nil { - return false - } - for _, ic := range insList.Items { - if ic.Annotations[types.DefaultIngressClassAnnotation] == "true" { - return true - } - } - return false - } else { - var ingressClass netv1.IngressClass - if err := c.Get(context.Background(), client.ObjectKey{ - Name: icName, - }, &ingressClass); err != nil { - return false - } - return true - } + ingressClass, _ := controller.GetIngressClass(context.Background(), c, log, icName) + return ingressClass != nil }), }, { @@ -246,34 +226,7 @@ func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, consumer); err != nil { return false } - - if consumer.Spec.GatewayRef.Name == "" { - return false - } - if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != types.KindGateway { - return false - } - if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { - return false - } - - ns := consumer.GetNamespace() - if consumer.Spec.GatewayRef.Namespace != nil { - ns = *consumer.Spec.GatewayRef.Namespace - } - - ctx := context.Background() - - gateway := &gatewayv1.Gateway{} - if err := c.Get(ctx, client.ObjectKey{Name: consumer.Spec.GatewayRef.Name, Namespace: ns}, gateway); err != nil { - return false - } - - gatewayClass := &gatewayv1.GatewayClass{} - if err := c.Get(ctx, client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { - return false - } - return string(gatewayClass.Spec.ControllerName) == config.GetControllerName() + return controller.MatchConsumerGatewayRef(context.Background(), c, log, consumer) }), }, }...) diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go index af33dc22fc..5e0dabc0d0 100644 --- a/internal/manager/readiness/manager.go +++ b/internal/manager/readiness/manager.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/api7/gopkg/pkg/log" "go.uber.org/zap" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -30,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" types "github.com/apache/apisix-ingress-controller/internal/types" - "github.com/api7/gopkg/pkg/log" ) // Filter defines an interface to match unstructured Kubernetes objects. @@ -121,7 +121,7 @@ func (r *readinessManager) Start(ctx context.Context) error { }) } if len(expected) > 0 { - log.Warnw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) + log.Debugw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) r.registerState(gvk, expected) } } diff --git a/internal/types/k8s.go b/internal/types/k8s.go index d827b9a5dc..d83158fe0d 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -18,12 +18,13 @@ package types import ( - "github.com/apache/apisix-ingress-controller/api/v1alpha1" - v2 "github.com/apache/apisix-ingress-controller/api/v2" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime/schema" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + v2 "github.com/apache/apisix-ingress-controller/api/v2" ) const DefaultIngressClassAnnotation = "ingressclass.kubernetes.io/is-default-class" From 0f39f96ce458490e4f3cfb46bb9ad5a4222a0dd8 Mon Sep 17 00:00:00 2001 From: alinsran Date: Thu, 24 Jul 2025 11:18:30 +0800 Subject: [PATCH 5/5] add atomic and defer --- internal/controller/apisixconsumer_controller.go | 2 +- internal/controller/apisixglobalrule_controller.go | 2 +- internal/controller/apisixroute_controller.go | 2 +- internal/controller/apisixtls_controller.go | 2 +- internal/controller/consumer_controller.go | 2 +- internal/controller/httproute_controller.go | 2 +- internal/controller/ingress_controller.go | 2 +- internal/manager/readiness/manager.go | 14 ++++++++------ 8 files changed, 15 insertions(+), 13 deletions(-) diff --git a/internal/controller/apisixconsumer_controller.go b/internal/controller/apisixconsumer_controller.go index af82552d5e..f9908b2564 100644 --- a/internal/controller/apisixconsumer_controller.go +++ b/internal/controller/apisixconsumer_controller.go @@ -57,8 +57,8 @@ type ApisixConsumerReconciler struct { // Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) func (r *ApisixConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName) r.Log.Info("reconcile", "request", req.NamespacedName) - r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName) ac := &apiv2.ApisixConsumer{} if err := r.Get(ctx, req.NamespacedName, ac); err != nil { diff --git a/internal/controller/apisixglobalrule_controller.go b/internal/controller/apisixglobalrule_controller.go index c41249faa4..9431df676d 100644 --- a/internal/controller/apisixglobalrule_controller.go +++ b/internal/controller/apisixglobalrule_controller.go @@ -56,7 +56,7 @@ type ApisixGlobalRuleReconciler struct { // Reconcile implements the reconciliation logic for ApisixGlobalRule func (r *ApisixGlobalRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName) + defer r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName) var globalRule apiv2.ApisixGlobalRule if err := r.Get(ctx, req.NamespacedName, &globalRule); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index 7a89e01365..c4dde0d5a0 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -99,7 +99,7 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName) + defer r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName) var ar apiv2.ApisixRoute if err := r.Get(ctx, req.NamespacedName, &ar); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixtls_controller.go b/internal/controller/apisixtls_controller.go index 4845b5d664..7410fdb075 100644 --- a/internal/controller/apisixtls_controller.go +++ b/internal/controller/apisixtls_controller.go @@ -87,7 +87,7 @@ func (r *ApisixTlsReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile implements the reconciliation logic for ApisixTls func (r *ApisixTlsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName) + defer r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName) var tls apiv2.ApisixTls if err := r.Get(ctx, req.NamespacedName, &tls); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 07e7ec4bb6..7869ad6b45 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -183,7 +183,7 @@ func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, o } func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName) + defer r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName) consumer := new(v1alpha1.Consumer) if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index e1da1f38ad..b6f3928785 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -132,7 +132,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName) + defer r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName) hr := new(gatewayv1.HTTPRoute) if err := r.Get(ctx, req.NamespacedName, hr); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 9770d050fe..25b52990ca 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -119,7 +119,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile handles the reconciliation of Ingress resources func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName) + defer r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName) ingress := new(networkingv1.Ingress) if err := r.Get(ctx, req.NamespacedName, ingress); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go index 5e0dabc0d0..7ba4b83bb0 100644 --- a/internal/manager/readiness/manager.go +++ b/internal/manager/readiness/manager.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/api7/gopkg/pkg/log" @@ -77,7 +78,7 @@ type readinessManager struct { started chan struct{} done chan struct{} - isReady bool + isReady atomic.Bool } // ReadinessManager tracks readiness of specific resources across the cluster. @@ -87,6 +88,7 @@ func NewReadinessManager(client client.Client) ReadinessManager { state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), started: make(chan struct{}), done: make(chan struct{}), + isReady: atomic.Bool{}, } } @@ -127,8 +129,8 @@ func (r *readinessManager) Start(ctx context.Context) error { } } close(r.started) - if len(r.state) == 0 && !r.isReady { - r.isReady = true + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) close(r.done) } }) @@ -161,14 +163,14 @@ func (r *readinessManager) Done(obj client.Object, nn k8stypes.NamespacedName) { if len(r.state[gvk]) == 0 { delete(r.state, gvk) } - if len(r.state) == 0 && !r.isReady { - r.isReady = true + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) close(r.done) } } func (r *readinessManager) IsReady() bool { - return r.isReady + return r.isReady.Load() } // WaitReady blocks until readiness is achieved, a timeout occurs, or context is cancelled.