diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index ae5fbd222..b796575d1 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -14,6 +14,7 @@ package controller import ( "context" + "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -32,6 +33,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -42,6 +44,8 @@ type ConsumerReconciler struct { //nolint:revive Log logr.Logger Provider provider.Provider + + Updater status.Updater } // SetupWithManager sets up the controller with the Manager. @@ -232,9 +236,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c statusErr = err } - if err := r.updateStatus(ctx, consumer, statusErr); err != nil { - return ctrl.Result{}, err - } + r.updateStatus(consumer, statusErr) return ctrl.Result{}, nil } @@ -269,20 +271,29 @@ func (r *ConsumerReconciler) processSpec(ctx context.Context, tctx *provider.Tra return nil } -func (r *ConsumerReconciler) updateStatus(ctx context.Context, consumer *v1alpha1.Consumer, err error) error { +func (r *ConsumerReconciler) updateStatus(consumer *v1alpha1.Consumer, err error) { condition := NewCondition(consumer.Generation, true, "Successfully") if err != nil { condition = NewCondition(consumer.Generation, false, err.Error()) } if !VerifyConditions(&consumer.Status.Conditions, condition) { - return nil + return } meta.SetStatusCondition(&consumer.Status.Conditions, condition) - if err := r.Status().Update(ctx, consumer); err != nil { - r.Log.Error(err, "failed to update consumer status", "consumer", consumer) - return err - } - return nil + + r.Updater.Update(status.Update{ + NamespacedName: NamespacedName(consumer), + Resource: consumer.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*v1alpha1.Consumer) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = consumer.Status + return t + }), + }) } func (r *ConsumerReconciler) getGateway(ctx context.Context, consumer *v1alpha1.Consumer) (*gatewayv1.Gateway, error) { diff --git a/internal/controller/gateway_controller.go b/internal/controller/gateway_controller.go index acf3e9a81..97a5bfa24 100644 --- a/internal/controller/gateway_controller.go +++ b/internal/controller/gateway_controller.go @@ -35,6 +35,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -45,6 +46,8 @@ type GatewayReconciler struct { //nolint:revive Log logr.Logger Provider provider.Provider + + Updater status.Updater } // SetupWithManager sets up the controller with the Manager. @@ -117,11 +120,11 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct conditionProgrammedStatus, conditionProgrammedMsg := true, "Programmed" r.Log.Info("gateway has been accepted", "gateway", gateway.GetName()) - type status struct { + type conditionStatus struct { status bool msg string } - acceptStatus := status{ + acceptStatus := conditionStatus{ status: true, msg: acceptedMessage("gateway"), } @@ -131,7 +134,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct r.processListenerConfig(tctx, gateway) if err := r.processInfrastructure(tctx, gateway); err != nil { - acceptStatus = status{ + acceptStatus = conditionStatus{ status: false, msg: err.Error(), } @@ -147,7 +150,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct gatewayProxy, ok := tctx.GatewayProxies[rk] if !ok { - acceptStatus = status{ + acceptStatus = conditionStatus{ status: false, msg: "gateway proxy not found", } @@ -167,7 +170,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } if err := r.Provider.Update(ctx, tctx, gateway); err != nil { - acceptStatus = status{ + acceptStatus = conditionStatus{ status: false, msg: err.Error(), } @@ -189,7 +192,21 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct gateway.Status.Listeners = listenerStatuses } - return ctrl.Result{}, r.Status().Update(ctx, gateway) + r.Updater.Update(status.Update{ + NamespacedName: NamespacedName(gateway), + Resource: gateway.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1.Gateway) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = gateway.Status + return t + }), + }) + + return ctrl.Result{}, nil } return ctrl.Result{}, nil diff --git a/internal/controller/gatewayclass_congroller.go b/internal/controller/gatewayclass_congroller.go index 0b5f0a3ed..9e8094181 100644 --- a/internal/controller/gatewayclass_congroller.go +++ b/internal/controller/gatewayclass_congroller.go @@ -30,6 +30,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" ) const ( @@ -46,6 +47,8 @@ type GatewayClassReconciler struct { //nolint:revive record.EventRecorder Log logr.Logger + + Updater status.Updater } // SetupWithManager sets up the controller with the Manager. @@ -111,9 +114,19 @@ func (r *GatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request if !IsConditionPresentAndEqual(gc.Status.Conditions, condition) { r.Log.Info("gatewayclass has been accepted", "gatewayclass", gc.Name) setGatewayClassCondition(gc, condition) - if err := r.Status().Update(ctx, gc); err != nil { - return ctrl.Result{}, err - } + r.Updater.Update(status.Update{ + NamespacedName: NamespacedName(gc), + Resource: gc.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*gatewayv1.GatewayClass) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = gc.Status + return t + }), + }) } return ctrl.Result{}, nil } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index ab6814c64..e4ed76461 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -42,6 +42,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -55,6 +56,8 @@ type HTTPRouteReconciler struct { //nolint:revive Provider provider.Provider genericEvent chan event.GenericEvent + + Updater status.Updater } // SetupWithManager sets up the controller with the Manager. @@ -143,13 +146,13 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - type status struct { + type ResourceStatus struct { status bool msg string } // Only keep acceptStatus since we're using error objects directly now - acceptStatus := status{ + acceptStatus := ResourceStatus{ status: true, msg: "Route is accepted", } @@ -233,10 +236,20 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( hr.Status.Parents = append(hr.Status.Parents, parentStatus) } - if err := r.Status().Update(ctx, hr); err != nil { - return ctrl.Result{}, err - } - UpdateStatus(r.Client, r.Log, tctx) + r.Updater.Update(status.Update{ + NamespacedName: NamespacedName(hr), + Resource: hr.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + h, ok := obj.(*gatewayv1.HTTPRoute) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + h.Status = hr.Status + return h + }), + }) + UpdateStatus(r.Updater, r.Log, tctx) return ctrl.Result{}, nil } diff --git a/internal/controller/httproutepolicy.go b/internal/controller/httproutepolicy.go index 8bb1b0310..08142664e 100644 --- a/internal/controller/httproutepolicy.go +++ b/internal/controller/httproutepolicy.go @@ -15,9 +15,9 @@ package controller import ( "cmp" "context" + "fmt" "slices" - "github.com/go-logr/logr" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -28,6 +28,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -71,7 +72,19 @@ func (r *HTTPRouteReconciler) processHTTPRoutePolicies(tctx *provider.TranslateC } if updated := setAncestorsForHTTPRoutePolicyStatus(httpRoute.Spec.ParentRefs, &policy, condition); updated { - tctx.StatusUpdaters = append(tctx.StatusUpdaters, &policy) + tctx.StatusUpdaters = append(tctx.StatusUpdaters, status.Update{ + NamespacedName: NamespacedName(&policy), + Resource: policy.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*v1alpha1.HTTPRoutePolicy) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = policy.Status + return t + }), + }) } } @@ -83,7 +96,7 @@ func (r *HTTPRouteReconciler) updateHTTPRoutePolicyStatusOnDeleting(ctx context. list v1alpha1.HTTPRoutePolicyList key = indexer.GenIndexKeyWithGK(gatewayv1.GroupName, "HTTPRoute", nn.Namespace, nn.Name) ) - if err := r.List(context.Background(), &list, client.MatchingFields{indexer.PolicyTargetRefs: key}); err != nil { + if err := r.List(ctx, &list, client.MatchingFields{indexer.PolicyTargetRefs: key}); err != nil { return err } var ( @@ -96,7 +109,7 @@ func (r *HTTPRouteReconciler) updateHTTPRoutePolicyStatusOnDeleting(ctx context. var namespacedName = types.NamespacedName{Namespace: policy.GetNamespace(), Name: string(ref.Name)} httpRoute, ok := httpRoutes[namespacedName] if !ok { - if err := r.Get(context.Background(), namespacedName, &httpRoute); err != nil { + if err := r.Get(ctx, namespacedName, &httpRoute); err != nil { continue } httpRoutes[namespacedName] = httpRoute @@ -104,7 +117,7 @@ func (r *HTTPRouteReconciler) updateHTTPRoutePolicyStatusOnDeleting(ctx context. parentRefs = append(parentRefs, httpRoute.Spec.ParentRefs...) } // delete AncestorRef which is not exist in the all parentRefs for each policy - updateDeleteAncestors(ctx, r.Client, r.Log, policy, parentRefs) + updateDeleteAncestors(r.Updater, policy, parentRefs) } return nil @@ -137,7 +150,19 @@ func (r *IngressReconciler) processHTTPRoutePolicies(tctx *provider.TranslateCon for i := range list.Items { policy := list.Items[i] if updated := setAncestorsForHTTPRoutePolicyStatus(tctx.RouteParentRefs, &policy, condition); updated { - tctx.StatusUpdaters = append(tctx.StatusUpdaters, &policy) + tctx.StatusUpdaters = append(tctx.StatusUpdaters, status.Update{ + NamespacedName: NamespacedName(&policy), + Resource: policy.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*v1alpha1.HTTPRoutePolicy) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = policy.Status + return t + }), + }) } } @@ -180,7 +205,7 @@ func (r *IngressReconciler) updateHTTPRoutePolicyStatusOnDeleting(ctx context.Co parentRefs = append(parentRefs, parentRef) } // delete AncestorRef which is not exist in the all parentRefs - updateDeleteAncestors(ctx, r.Client, r.Log, policy, parentRefs) + updateDeleteAncestors(r.Updater, policy, parentRefs) } return nil @@ -229,7 +254,7 @@ func findPoliciesWhichTargetRefTheRule(ruleName *gatewayv1.SectionName, kind str } // updateDeleteAncestors removes ancestor references from HTTPRoutePolicy statuses that are no longer present in the provided parentRefs. -func updateDeleteAncestors(ctx context.Context, client client.Client, logger logr.Logger, policy v1alpha1.HTTPRoutePolicy, parentRefs []gatewayv1.ParentReference) { +func updateDeleteAncestors(updater status.Updater, policy v1alpha1.HTTPRoutePolicy, parentRefs []gatewayv1.ParentReference) { length := len(policy.Status.Ancestors) policy.Status.Ancestors = slices.DeleteFunc(policy.Status.Ancestors, func(ancestor v1alpha2.PolicyAncestorStatus) bool { return !slices.ContainsFunc(parentRefs, func(ref gatewayv1.ParentReference) bool { @@ -237,8 +262,18 @@ func updateDeleteAncestors(ctx context.Context, client client.Client, logger log }) }) if length != len(policy.Status.Ancestors) { - if err := client.Status().Update(ctx, &policy); err != nil { - logger.Error(err, "failed to update HTTPRoutePolicy status") - } + updater.Update(status.Update{ + NamespacedName: NamespacedName(&policy), + Resource: policy.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*v1alpha1.HTTPRoutePolicy) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = policy.Status + return t + }), + }) } } diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 9aee9bdee..a71cb6ad6 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -41,6 +41,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -52,6 +53,8 @@ type IngressReconciler struct { //nolint:revive Provider provider.Provider genericEvent chan event.GenericEvent + + Updater status.Updater } // SetupWithManager sets up the controller with the Manager. @@ -185,7 +188,7 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // update the status of related resources - UpdateStatus(r.Client, r.Log, tctx) + UpdateStatus(r.Updater, r.Log, tctx) // update the ingress status if err := r.updateStatus(ctx, tctx, ingress, ingressClass); err != nil { @@ -661,7 +664,20 @@ func (r *IngressReconciler) updateStatus(ctx context.Context, tctx *provider.Tra // update the load balancer status if len(loadBalancerStatus.Ingress) > 0 && !reflect.DeepEqual(ingress.Status.LoadBalancer, loadBalancerStatus) { ingress.Status.LoadBalancer = loadBalancerStatus - return r.Status().Update(ctx, ingress) + r.Updater.Update(status.Update{ + NamespacedName: NamespacedName(ingress), + Resource: ingress.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*networkingv1.Ingress) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = ingress.Status + return t + }), + }) + return nil } return nil diff --git a/internal/controller/policies.go b/internal/controller/policies.go index 66387f297..ca3bd54b7 100644 --- a/internal/controller/policies.go +++ b/internal/controller/policies.go @@ -30,6 +30,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -147,7 +148,19 @@ func ProcessBackendTrafficPolicy( processPolicyStatus(policy, tctx, condition, &updated) } if updated { - tctx.StatusUpdaters = append(tctx.StatusUpdaters, policy.DeepCopy()) + tctx.StatusUpdaters = append(tctx.StatusUpdaters, status.Update{ + NamespacedName: NamespacedName(policy), + Resource: policy.DeepCopy(), + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + t, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + err := fmt.Errorf("unsupported object type %T", obj) + panic(err) + } + t.Status = policy.Status + return t + }), + }) } } for _, policy := range conflicts { diff --git a/internal/controller/status.go b/internal/controller/status.go index 21c576d73..fd392d2d4 100644 --- a/internal/controller/status.go +++ b/internal/controller/status.go @@ -16,9 +16,9 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -92,11 +92,11 @@ func NewPolicyConflictCondition(observedGeneration int64, message string) metav1 } func UpdateStatus( - c client.Client, + updater status.Updater, log logr.Logger, tctx *provider.TranslateContext, ) { - for _, obj := range tctx.StatusUpdaters { - _ = c.Status().Update(tctx, obj) + for _, update := range tctx.StatusUpdaters { + updater.Update(update) } } diff --git a/internal/controller/status/updater.go b/internal/controller/status/updater.go new file mode 100644 index 000000000..72dd64f1b --- /dev/null +++ b/internal/controller/status/updater.go @@ -0,0 +1,137 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package status + +import ( + "context" + "sync" + + "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const UpdateChannelBufferSize = 1000 + +type Update struct { + NamespacedName types.NamespacedName + Resource client.Object + Mutator Mutator +} + +type Mutator interface { + Mutate(obj client.Object) client.Object +} + +type MutatorFunc func(client.Object) client.Object + +func (m MutatorFunc) Mutate(obj client.Object) client.Object { + if m == nil { + return nil + } + + return m(obj) +} + +type UpdateHandler struct { + log logr.Logger + client client.Client + updateChannel chan Update + wg *sync.WaitGroup +} + +func NewStatusUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { + u := &UpdateHandler{ + log: log, + client: client, + updateChannel: make(chan Update, UpdateChannelBufferSize), + wg: new(sync.WaitGroup), + } + + u.wg.Add(1) + return u +} + +func (u *UpdateHandler) apply(ctx context.Context, update Update) { + if err := retry.OnError(retry.DefaultBackoff, func(err error) bool { + return k8serrors.IsConflict(err) + }, func() error { + return u.updateStatus(ctx, update) + }); err != nil { + u.log.Error(err, "unable to update status", "name", update.NamespacedName.Name, + "namespace", update.NamespacedName.Namespace) + } +} + +func (u *UpdateHandler) updateStatus(ctx context.Context, update Update) error { + var obj = update.Resource + oldGeneration := obj.GetGeneration() + if err := u.client.Get(ctx, update.NamespacedName, obj); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + + if obj.GetGeneration() != oldGeneration { + return nil + } + obj = update.Mutator.Mutate(obj) + if obj == nil { + return nil + } + + return u.client.Status().Update(ctx, obj) +} + +func (u *UpdateHandler) Start(ctx context.Context) error { + u.log.Info("started status update handler") + defer u.log.Info("stopped status update handler") + + u.wg.Done() + + for { + select { + case <-ctx.Done(): + return nil + case update := <-u.updateChannel: + u.log.Info("received a status update", "namespace", update.NamespacedName.Namespace, + "name", update.NamespacedName.Name) + + u.apply(ctx, update) + } + } +} + +func (u *UpdateHandler) Writer() Updater { + return &UpdateWriter{ + updateChannel: u.updateChannel, + wg: u.wg, + } +} + +type Updater interface { + Update(u Update) +} + +type UpdateWriter struct { + updateChannel chan<- Update + wg *sync.WaitGroup +} + +func (u *UpdateWriter) Update(update Update) { + u.wg.Wait() + u.updateChannel <- update +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 4c5de8167..7faacb917 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -1147,3 +1147,10 @@ func checkReferenceGrant(ctx context.Context, cli client.Client, obj v1beta1.Ref } return false } + +func NamespacedName(obj client.Object) types.NamespacedName { + return types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } +} diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 398e7b3b2..d3fbeec8b 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -20,6 +20,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider" ) @@ -58,39 +59,44 @@ type Controller interface { SetupWithManager(mgr manager.Manager) error } -func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider) ([]Controller, error) { +func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater) ([]Controller, error) { if err := indexer.SetupIndexer(mgr); err != nil { return nil, err } return []Controller{ &controller.GatewayClassReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("GatewayClass"), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("GatewayClass"), + Updater: updater, }, &controller.GatewayReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Gateway"), Provider: pro, + Updater: updater, }, &controller.HTTPRouteReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("HTTPRoute"), Provider: pro, + Updater: updater, }, &controller.IngressReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Ingress"), Provider: pro, + Updater: updater, }, &controller.ConsumerReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Consumer"), Provider: pro, + Updater: updater, }, &controller.IngressClassReconciler{ Client: mgr.GetClient(), diff --git a/internal/manager/run.go b/internal/manager/run.go index 2a1daf99f..6dd160c1e 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -35,6 +35,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/config" + "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/provider/adc" ) @@ -146,6 +147,12 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } + updater := status.NewStatusUpdateHandler(ctrl.LoggerFrom(ctx).WithName("status").WithName("updater"), mgr.GetClient()) + if err := mgr.Add(updater); err != nil { + setupLog.Error(err, "unable to add status updater") + return err + } + go func() { setupLog.Info("starting provider sync") initalSyncDelay := config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration @@ -189,7 +196,7 @@ func Run(ctx context.Context, logger logr.Logger) error { controller.SetEnableReferenceGrant(err == nil) setupLog.Info("setting up controllers") - controllers, err := setupControllers(ctx, mgr, provider) + controllers, err := setupControllers(ctx, mgr, provider, updater.Writer()) if err != nil { setupLog.Error(err, "unable to set up controllers") return err diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index bf4947ec2..181d36a70 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -52,8 +52,7 @@ type adcClient struct { // gateway/ingressclass -> adcConfig configs map[provider.ResourceKind]adcConfig // httproute/consumer/ingress/gateway -> gateway/ingressclass - parentRefs map[provider.ResourceKind][]provider.ResourceKind - + parentRefs map[provider.ResourceKind][]provider.ResourceKind syncTimeout time.Duration store *Store diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 8cff0ca11..d4d855714 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -22,6 +22,7 @@ import ( gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/controller/status" ) type Provider interface { @@ -52,7 +53,7 @@ type TranslateContext struct { ResourceParentRefs map[ResourceKind][]ResourceKind HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy - StatusUpdaters []client.Object + StatusUpdaters []status.Update } func NewDefaultTranslateContext(ctx context.Context) *TranslateContext {