diff --git a/internal/controller/apisixconsumer_controller.go b/internal/controller/apisixconsumer_controller.go index eb9b3c732..f9908b256 100644 --- a/internal/controller/apisixconsumer_controller.go +++ b/internal/controller/apisixconsumer_controller.go @@ -39,6 +39,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,10 +52,12 @@ type ApisixConsumerReconciler struct { Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) func (r *ApisixConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName) r.Log.Info("reconcile", "request", req.NamespacedName) ac := &apiv2.ApisixConsumer{} diff --git a/internal/controller/apisixglobalrule_controller.go b/internal/controller/apisixglobalrule_controller.go index fc42b1fb2..9431df676 100644 --- a/internal/controller/apisixglobalrule_controller.go +++ b/internal/controller/apisixglobalrule_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -49,10 +50,13 @@ type ApisixGlobalRuleReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + + Readier readiness.ReadinessManager } // Reconcile implements the reconciliation logic for ApisixGlobalRule func (r *ApisixGlobalRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName) var globalRule apiv2.ApisixGlobalRule if err := r.Get(ctx, req.NamespacedName, &globalRule); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index a07c6e76f..c4dde0d5a 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -44,6 +44,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -57,6 +58,7 @@ type ApisixRouteReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -97,6 +99,7 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName) var ar apiv2.ApisixRoute if err := r.Get(ctx, req.NamespacedName, &ar); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/apisixtls_controller.go b/internal/controller/apisixtls_controller.go index 5cde26f6c..7410fdb07 100644 --- a/internal/controller/apisixtls_controller.go +++ b/internal/controller/apisixtls_controller.go @@ -39,6 +39,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -50,6 +51,7 @@ type ApisixTlsReconciler struct { Log logr.Logger Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -85,6 +87,7 @@ func (r *ApisixTlsReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile implements the reconciliation logic for ApisixTls func (r *ApisixTlsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName) var tls apiv2.ApisixTls if err := r.Get(ctx, req.NamespacedName, &tls); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index fd51f87a1..7869ad6b4 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -38,6 +38,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -51,6 +52,7 @@ type ConsumerReconciler struct { //nolint:revive Provider provider.Provider Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -181,6 +183,7 @@ func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, o } func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName) consumer := new(v1alpha1.Consumer) if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { if client.IgnoreNotFound(err) == nil { @@ -303,31 +306,5 @@ func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool { if !ok { return false } - if consumer.Spec.GatewayRef.Name == "" { - return false - } - if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { - return false - } - if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { - return false - } - ns := consumer.GetNamespace() - if consumer.Spec.GatewayRef.Namespace != nil { - ns = *consumer.Spec.GatewayRef.Namespace - } - gateway := &gatewayv1.Gateway{} - if err := r.Get(context.Background(), client.ObjectKey{ - Name: consumer.Spec.GatewayRef.Name, - Namespace: ns, - }, gateway); err != nil { - r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) - return false - } - gatewayClass := &gatewayv1.GatewayClass{} - if err := r.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { - r.Log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) - return false - } - return matchesController(string(gatewayClass.Spec.ControllerName)) + return MatchConsumerGatewayRef(context.Background(), r.Client, r.Log, consumer) } diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 50b6e3b2b..b6f392878 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -48,6 +48,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" @@ -65,6 +66,7 @@ type HTTPRouteReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -130,6 +132,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName) hr := new(gatewayv1.HTTPRoute) if err := r.Get(ctx, req.NamespacedName, hr); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 2f1b75742..e621ac109 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -47,6 +47,7 @@ const ( GatewayClassIndexRef = "gatewayClassRef" ApisixUpstreamRef = "apisixUpstreamRef" PluginConfigIndexRef = "pluginConfigRefs" + ControllerName = "controllerName" ) func SetupIndexer(mgr ctrl.Manager) error { @@ -59,11 +60,11 @@ func SetupIndexer(mgr ctrl.Manager) error { setupIngressClassIndexer, setupGatewayProxyIndexer, setupGatewaySecretIndex, - setupGatewayClassIndexer, setupApisixRouteIndexer, setupApisixPluginConfigIndexer, setupApisixTlsIndexer, setupApisixConsumerIndexer, + setupGatewayClassIndexer, } { if err := setup(mgr); err != nil { return err @@ -81,6 +82,17 @@ func setupGatewayIndexer(mgr ctrl.Manager) error { ); err != nil { return err } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &gatewayv1.Gateway{}, + GatewayClassIndexRef, + func(obj client.Object) (requests []string) { + return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + }, + ); err != nil { + return err + } return nil } @@ -273,10 +285,10 @@ func setupGatewaySecretIndex(mgr ctrl.Manager) error { func setupGatewayClassIndexer(mgr ctrl.Manager) error { return mgr.GetFieldIndexer().IndexField( context.Background(), - &gatewayv1.Gateway{}, - GatewayClassIndexRef, - func(obj client.Object) (requests []string) { - return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)} + &gatewayv1.GatewayClass{}, + ControllerName, + func(obj client.Object) []string { + return []string{string(obj.(*gatewayv1.GatewayClass).Spec.ControllerName)} }, ) } diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 86c4d7ff0..25b52990c 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -45,6 +45,7 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -59,6 +60,7 @@ type IngressReconciler struct { //nolint:revive genericEvent chan event.GenericEvent Updater status.Updater + Readier readiness.ReadinessManager } // SetupWithManager sets up the controller with the Manager. @@ -117,6 +119,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { // Reconcile handles the reconciliation of Ingress resources func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + defer r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName) ingress := new(networkingv1.Ingress) if err := r.Get(ctx, req.NamespacedName, ingress); err != nil { if client.IgnoreNotFound(err) == nil { diff --git a/internal/controller/status/updater.go b/internal/controller/status/updater.go index 019c18468..d00aec76a 100644 --- a/internal/controller/status/updater.go +++ b/internal/controller/status/updater.go @@ -150,6 +150,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { } } +func (u *UpdateHandler) NeedsLeaderElection() bool { + return true +} + func (u *UpdateHandler) Writer() Updater { return &UpdateWriter{ updateChannel: u.updateChannel, diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 81a44843f..0917cf815 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -35,6 +35,7 @@ import ( corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -1448,3 +1449,37 @@ func TypePredicate[T client.Object]() func(obj client.Object) bool { return ok } } + +func MatchConsumerGatewayRef(ctx context.Context, c client.Client, log logr.Logger, consumer *v1alpha1.Consumer) bool { + if consumer.Spec.GatewayRef.Name == "" { + return false + } + if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway { + return false + } + if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName { + return false + } + ns := consumer.GetNamespace() + if consumer.Spec.GatewayRef.Namespace != nil { + ns = *consumer.Spec.GatewayRef.Namespace + } + gateway := &gatewayv1.Gateway{} + if err := c.Get(context.Background(), client.ObjectKey{ + Name: consumer.Spec.GatewayRef.Name, + Namespace: ns, + }, gateway); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name) + } + return false + } + gatewayClass := &gatewayv1.GatewayClass{} + if err := c.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName) + } + return false + } + return matchesController(string(gatewayClass.Spec.ControllerName)) +} diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index 2af8cd6aa..0688055d4 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -20,13 +20,23 @@ package manager import ( "context" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/indexer" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" + types "github.com/apache/apisix-ingress-controller/internal/types" ) // K8s @@ -83,7 +93,7 @@ type Controller interface { SetupWithManager(mgr manager.Manager) error } -func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater) ([]Controller, error) { +func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Provider, updater status.Updater, readier readiness.ReadinessManager) ([]Controller, error) { if err := indexer.SetupIndexer(mgr); err != nil { return nil, err } @@ -107,6 +117,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("HTTPRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressReconciler{ Client: mgr.GetClient(), @@ -114,6 +125,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Ingress"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ConsumerReconciler{ Client: mgr.GetClient(), @@ -121,6 +133,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("Consumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.IngressClassReconciler{ Client: mgr.GetClient(), @@ -134,6 +147,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixGlobalRule"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixRouteReconciler{ Client: mgr.GetClient(), @@ -141,6 +155,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixRoute"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixConsumerReconciler{ Client: mgr.GetClient(), @@ -148,6 +163,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixConsumer"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixPluginConfigReconciler{ Client: mgr.GetClient(), @@ -161,6 +177,7 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixTls"), Provider: pro, Updater: updater, + Readier: readier, }, &controller.ApisixUpstreamReconciler{ Client: mgr.GetClient(), @@ -176,3 +193,41 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro }, }, nil } + +func registerReadinessGVK(c client.Client, readier readiness.ReadinessManager) { + log := ctrl.LoggerFrom(context.Background()).WithName("readiness") + readier.RegisterGVK([]readiness.GVKConfig{ + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&gatewayv1.HTTPRoute{}), + }, + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&netv1.Ingress{}), + types.GvkOf(&apiv2.ApisixRoute{}), + types.GvkOf(&apiv2.ApisixGlobalRule{}), + types.GvkOf(&apiv2.ApisixPluginConfig{}), + types.GvkOf(&apiv2.ApisixTls{}), + types.GvkOf(&apiv2.ApisixConsumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + icName, _, _ := unstructured.NestedString(obj.Object, "spec", "ingressClassName") + ingressClass, _ := controller.GetIngressClass(context.Background(), c, log, icName) + return ingressClass != nil + }), + }, + { + GVKs: []schema.GroupVersionKind{ + types.GvkOf(&v1alpha1.Consumer{}), + }, + Filter: readiness.GVKFilter(func(obj *unstructured.Unstructured) bool { + consumer := &v1alpha1.Consumer{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, consumer); err != nil { + return false + } + return controller.MatchConsumerGatewayRef(context.Background(), c, log, consumer) + }), + }, + }...) +} diff --git a/internal/manager/readiness/manager.go b/internal/manager/readiness/manager.go new file mode 100644 index 000000000..7ba4b83bb --- /dev/null +++ b/internal/manager/readiness/manager.go @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readiness + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/api7/gopkg/pkg/log" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + types "github.com/apache/apisix-ingress-controller/internal/types" +) + +// Filter defines an interface to match unstructured Kubernetes objects. +type Filter interface { + Match(obj *unstructured.Unstructured) bool +} + +// GVKFilter is a functional implementation of Filter using a function type. +type GVKFilter func(obj *unstructured.Unstructured) bool + +func (f GVKFilter) Match(obj *unstructured.Unstructured) bool { + return f(obj) +} + +// GVKConfig defines a set of GVKs and an optional filter to match the objects. +type GVKConfig struct { + GVKs []schema.GroupVersionKind + Filter Filter +} + +// readinessManager prevents premature full sync to the data plane on controller startup. +// +// Background: +// On startup, the controller watches CRDs and periodically performs full sync to the data plane. +// If a sync occurs before all resources have been reconciled, it may push incomplete data, +// causing traffic disruption. +// +// This manager tracks whether all relevant resources have been processed at least once. +// It is used to delay full sync until initial reconciliation is complete. +type ReadinessManager interface { + RegisterGVK(configs ...GVKConfig) + Start(ctx context.Context) error + IsReady() bool + WaitReady(ctx context.Context, timeout time.Duration) bool + Done(obj client.Object, namespacedName k8stypes.NamespacedName) +} + +type readinessManager struct { + client client.Client + configs []GVKConfig + state map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{} + mu sync.RWMutex + startOnce sync.Once + started chan struct{} + done chan struct{} + + isReady atomic.Bool +} + +// ReadinessManager tracks readiness of specific resources across the cluster. +func NewReadinessManager(client client.Client) ReadinessManager { + return &readinessManager{ + client: client, + state: make(map[schema.GroupVersionKind]map[k8stypes.NamespacedName]struct{}), + started: make(chan struct{}), + done: make(chan struct{}), + isReady: atomic.Bool{}, + } +} + +// RegisterGVK registers one or more GVKConfig objects for readiness tracking. +func (r *readinessManager) RegisterGVK(configs ...GVKConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs = append(r.configs, configs...) +} + +// Start initializes the readiness state from the Kubernetes API. +// Should be called only after informer cache has synced. +func (r *readinessManager) Start(ctx context.Context) error { + var err error + r.startOnce.Do(func() { + for _, cfg := range r.configs { + for _, gvk := range cfg.GVKs { + uList := &unstructured.UnstructuredList{} + uList.SetGroupVersionKind(gvk) + if listErr := r.client.List(ctx, uList); listErr != nil { + err = fmt.Errorf("list %s failed: %w", gvk.String(), listErr) + return + } + var expected []k8stypes.NamespacedName + for _, item := range uList.Items { + if cfg.Filter != nil && !cfg.Filter.Match(&item) { + continue + } + expected = append(expected, k8stypes.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + }) + } + if len(expected) > 0 { + log.Debugw("registering readiness state", zap.Any("gvk", gvk), zap.Any("expected", expected)) + r.registerState(gvk, expected) + } + } + } + close(r.started) + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) + close(r.done) + } + }) + return err +} + +func (r *readinessManager) registerState(gvk schema.GroupVersionKind, list []k8stypes.NamespacedName) { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.state[gvk]; !ok { + r.state[gvk] = make(map[k8stypes.NamespacedName]struct{}) + } + for _, name := range list { + r.state[gvk][name] = struct{}{} + } +} + +// Done marks the resource as ready by removing it from the pending state. +func (r *readinessManager) Done(obj client.Object, nn k8stypes.NamespacedName) { + if r.IsReady() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + gvk := types.GvkOf(obj) + if _, ok := r.state[gvk]; !ok { + return + } + delete(r.state[gvk], nn) + if len(r.state[gvk]) == 0 { + delete(r.state, gvk) + } + if len(r.state) == 0 && !r.isReady.Load() { + r.isReady.Store(true) + close(r.done) + } +} + +func (r *readinessManager) IsReady() bool { + return r.isReady.Load() +} + +// WaitReady blocks until readiness is achieved, a timeout occurs, or context is cancelled. +func (r *readinessManager) WaitReady(ctx context.Context, timeout time.Duration) bool { + if r.IsReady() { + return true + } + + select { + case <-r.started: + case <-ctx.Done(): + return false + } + + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return true + case <-r.done: + return true + } +} diff --git a/internal/manager/run.go b/internal/manager/run.go index a27b27325..b75b383d8 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -41,6 +41,7 @@ import ( "github.com/apache/apisix-ingress-controller/internal/controller" "github.com/apache/apisix-ingress-controller/internal/controller/config" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider/adc" _ "github.com/apache/apisix-ingress-controller/pkg/metrics" ) @@ -150,13 +151,20 @@ func Run(ctx context.Context, logger logr.Logger) error { return err } + readier := readiness.NewReadinessManager(mgr.GetClient()) + registerReadinessGVK(mgr.GetClient(), readier) + + if err := mgr.Add(readier); err != nil { + setupLog.Error(err, "unable to add readiness manager") + } + updater := status.NewStatusUpdateHandler(ctrl.LoggerFrom(ctx).WithName("status").WithName("updater"), mgr.GetClient()) if err := mgr.Add(updater); err != nil { setupLog.Error(err, "unable to add status updater") return err } - provider, err := adc.New(updater.Writer(), &adc.Options{ + provider, err := adc.New(updater.Writer(), readier, &adc.Options{ SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration, SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration, InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration, @@ -184,7 +192,7 @@ func Run(ctx context.Context, logger logr.Logger) error { controller.SetEnableReferenceGrant(err == nil) setupLog.Info("setting up controllers") - controllers, err := setupControllers(ctx, mgr, provider, updater.Writer()) + controllers, err := setupControllers(ctx, mgr, provider, updater.Writer(), readier) if err != nil { setupLog.Error(err, "unable to set up controllers") return err diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index 4750be4d0..20a4497e4 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -39,6 +39,7 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/manager/readiness" "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/provider/adc/translator" "github.com/apache/apisix-ingress-controller/internal/types" @@ -95,6 +96,8 @@ type adcClient struct { updater status.Updater statusUpdateMap map[types.NamespacedNameKind][]string + readier readiness.ReadinessManager + syncCh chan struct{} } @@ -106,7 +109,7 @@ type Task struct { configs []adcConfig } -func New(updater status.Updater, opts ...Option) (provider.Provider, error) { +func New(updater status.Updater, readier readiness.ReadinessManager, opts ...Option) (provider.Provider, error) { o := Options{} o.ApplyOptions(opts) @@ -118,6 +121,7 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) { store: NewStore(), executor: &DefaultADCExecutor{}, updater: updater, + readier: readier, syncCh: make(chan struct{}, 1), }, nil } @@ -311,6 +315,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { } func (d *adcClient) Start(ctx context.Context) error { + d.readier.WaitReady(ctx, 5*time.Minute) + initalSyncDelay := d.InitSyncDelay if initalSyncDelay > 0 { time.AfterFunc(initalSyncDelay, func() { @@ -555,3 +561,7 @@ func (d *adcClient) handleADCExecutionErrors(statusesMap map[string]types.ADCExe statusUpdateMap := d.resolveADCExecutionErrors(statusesMap) d.handleStatusUpdate(statusUpdateMap) } + +func (d *adcClient) NeedLeaderElection() bool { + return true +} diff --git a/internal/provider/controlplane/controlplane.go b/internal/provider/controlplane/controlplane.go deleted file mode 100644 index 4a6e38f92..000000000 --- a/internal/provider/controlplane/controlplane.go +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package controlplane - -import ( - "context" - "fmt" - - "github.com/api7/gopkg/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/client" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - "github.com/apache/apisix-ingress-controller/internal/controller/config" - "github.com/apache/apisix-ingress-controller/internal/provider" - "github.com/apache/apisix-ingress-controller/internal/provider/controlplane/translator" - "github.com/apache/apisix-ingress-controller/pkg/dashboard" -) - -type dashboardProvider struct { - translator *translator.Translator - c dashboard.Dashboard -} - -//nolint:unused -func NewDashboard() (provider.Provider, error) { - control, err := dashboard.NewClient() - if err != nil { - return nil, err - } - - if err := control.AddCluster(context.TODO(), &dashboard.ClusterOptions{ - Name: "default", - Labels: map[string]string{ - "k8s/controller-name": config.ControllerConfig.ControllerName, - }, - ControllerName: config.ControllerConfig.ControllerName, - SyncCache: true, - }); err != nil { - return nil, err - } - - return &dashboardProvider{ - translator: &translator.Translator{}, - c: control, - }, nil -} - -func (d *dashboardProvider) Update(ctx context.Context, tctx *provider.TranslateContext, obj client.Object) error { - var result *translator.TranslateResult - var err error - switch obj := obj.(type) { - case *gatewayv1.HTTPRoute: - result, err = d.translator.TranslateHTTPRoute(tctx, obj.DeepCopy()) - case *gatewayv1.Gateway: - result, err = d.translator.TranslateGateway(tctx, obj.DeepCopy()) - } - if err != nil { - return err - } - // TODO: support diff resources - name := "default" - for _, service := range result.Services { - if _, err := d.c.Cluster(name).Service().Update(ctx, service); err != nil { - return err - } - } - for _, route := range result.Routes { - if _, err := d.c.Cluster(name).Route().Update(ctx, route); err != nil { - return err - } - } - for _, ssl := range result.SSL { - // to avoid duplication - ssl.Snis = arrayUniqueElements(ssl.Snis, []string{}) - if len(ssl.Snis) == 1 && ssl.Snis[0] == "*" { - log.Warnf("wildcard hostname is not allowed in ssl object. Skipping SSL creation for %s: %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName()) - return nil - } - ssl.Snis = removeWildcard(ssl.Snis) - oldssl, err := d.c.Cluster(name).SSL().Get(ctx, ssl.Cert) - if err != nil || oldssl == nil { - if _, err := d.c.Cluster(name).SSL().Create(ctx, ssl); err != nil { - return fmt.Errorf("failed to create ssl for sni %+v: %w", ssl.Snis, err) - } - } else { - // array union is done to avoid host duplication - ssl.Snis = arrayUniqueElements(ssl.Snis, oldssl.Snis) - if _, err := d.c.Cluster(name).SSL().Update(ctx, ssl); err != nil { - return fmt.Errorf("failed to update ssl for sni %+v: %w", ssl.Snis, err) - } - } - } - return nil -} - -func removeWildcard(snis []string) []string { - newSni := make([]string, 0) - for _, sni := range snis { - if sni != "*" { - newSni = append(newSni, sni) - } - } - return newSni -} - -func arrayUniqueElements(arr1 []string, arr2 []string) []string { - // return a union of elements from both array - presentEle := make(map[string]bool) - newArr := make([]string, 0) - for _, ele := range arr1 { - if !presentEle[ele] { - presentEle[ele] = true - newArr = append(newArr, ele) - } - } - for _, ele := range arr2 { - if !presentEle[ele] { - presentEle[ele] = true - newArr = append(newArr, ele) - } - } - return newArr -} - -func (d *dashboardProvider) Delete(ctx context.Context, obj client.Object) error { - clusters := d.c.ListClusters() - kindLabel := dashboard.ListByKindLabelOptions{ - Kind: obj.GetObjectKind().GroupVersionKind().Kind, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - for _, cluster := range clusters { - switch obj.(type) { - case *gatewayv1.Gateway: - ssls, _ := cluster.SSL().List(ctx, dashboard.ListOptions{ - From: dashboard.ListFromCache, - KindLabel: kindLabel, - }) - for _, ssl := range ssls { - if err := cluster.SSL().Delete(ctx, ssl); err != nil { - return err - } - } - case *gatewayv1.HTTPRoute: - routes, _ := cluster.Route().List(ctx, dashboard.ListOptions{ - From: dashboard.ListFromCache, - KindLabel: kindLabel, - }) - - for _, route := range routes { - if err := cluster.Route().Delete(ctx, route); err != nil { - return err - } - } - - services, _ := cluster.Service().List(ctx, dashboard.ListOptions{ - From: dashboard.ListFromCache, - KindLabel: kindLabel, - }) - - for _, service := range services { - if err := cluster.Service().Delete(ctx, service); err != nil { - return err - } - } - } - } - return nil -} - -func (d *dashboardProvider) Sync(ctx context.Context) error { - return nil -} - -func (d *dashboardProvider) Start(ctx context.Context) error { - return nil -} diff --git a/internal/provider/controlplane/manifest.go b/internal/provider/controlplane/manifest.go deleted file mode 100644 index 32a16b4f0..000000000 --- a/internal/provider/controlplane/manifest.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package controlplane diff --git a/internal/provider/controlplane/translator/gateway.go b/internal/provider/controlplane/translator/gateway.go deleted file mode 100644 index bc1b3560e..000000000 --- a/internal/provider/controlplane/translator/gateway.go +++ /dev/null @@ -1,166 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package translator - -import ( - "crypto/x509" - "encoding/pem" - "fmt" - - "github.com/api7/gopkg/pkg/log" - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - v1 "github.com/apache/apisix-ingress-controller/api/dashboard/v1" - "github.com/apache/apisix-ingress-controller/internal/controller/label" - "github.com/apache/apisix-ingress-controller/internal/id" - "github.com/apache/apisix-ingress-controller/internal/provider" -) - -func (t *Translator) TranslateGateway(tctx *provider.TranslateContext, obj *gatewayv1.Gateway) (*TranslateResult, error) { - result := &TranslateResult{} - for _, listener := range obj.Spec.Listeners { - if listener.TLS != nil { - tctx.GatewayTLSConfig = append(tctx.GatewayTLSConfig, *listener.TLS) - ssl, err := t.translateSecret(tctx, listener, obj) - if err != nil { - return nil, fmt.Errorf("failed to translate secret: %w", err) - } - result.SSL = append(result.SSL, ssl...) - } - } - return result, nil -} - -func (t *Translator) translateSecret(tctx *provider.TranslateContext, listener gatewayv1.Listener, obj *gatewayv1.Gateway) ([]*v1.Ssl, error) { - if tctx.Secrets == nil { - return nil, nil - } - if listener.TLS.CertificateRefs == nil { - return nil, fmt.Errorf("no certificateRefs found in listener %s", listener.Name) - } - sslObjs := make([]*v1.Ssl, 0) - switch *listener.TLS.Mode { - case gatewayv1.TLSModeTerminate: - for _, ref := range listener.TLS.CertificateRefs { - ns := obj.GetNamespace() - if ref.Namespace != nil { - ns = string(*ref.Namespace) - } - if listener.TLS.CertificateRefs[0].Kind != nil && *listener.TLS.CertificateRefs[0].Kind == "Secret" { - sslObj := &v1.Ssl{ - Snis: []string{}, - } - name := listener.TLS.CertificateRefs[0].Name - secret := tctx.Secrets[types.NamespacedName{Namespace: ns, Name: string(ref.Name)}] - if secret == nil { - continue - } - if secret.Data == nil { - log.Error("secret data is nil", "secret", secret) - return nil, fmt.Errorf("no secret data found for %s/%s", ns, name) - } - cert, key, err := extractKeyPair(secret, true) - if err != nil { - return nil, err - } - sslObj.Cert = string(cert) - sslObj.Key = string(key) - // Dashboard doesn't allow wildcard hostname - if listener.Hostname != nil && *listener.Hostname != "" { - sslObj.Snis = append(sslObj.Snis, string(*listener.Hostname)) - } - hosts, err := extractHost(cert) - if err != nil { - return nil, err - } - sslObj.Snis = append(sslObj.Snis, hosts...) - // Note: Dashboard doesn't allow duplicate certificate across ssl objects - sslObj.ID = id.GenID(sslObj.Cert) - sslObj.Labels = label.GenLabel(obj) - sslObjs = append(sslObjs, sslObj) - } - - } - // Only supported on TLSRoute. The certificateRefs field is ignored in this mode. - case gatewayv1.TLSModePassthrough: - return sslObjs, nil - default: - return nil, fmt.Errorf("unknown TLS mode %s", *listener.TLS.Mode) - } - - return sslObjs, nil -} - -func extractHost(cert []byte) ([]string, error) { - block, _ := pem.Decode(cert) - if block == nil { - return nil, errors.New("parse certificate: not in PEM format") - } - der, err := x509.ParseCertificate(block.Bytes) - if err != nil { - return nil, errors.Wrap(err, "parse certificate") - } - return der.DNSNames, nil -} - -func extractKeyPair(s *corev1.Secret, hasPrivateKey bool) ([]byte, []byte, error) { - if _, ok := s.Data["cert"]; ok { - return extractApisixSecretKeyPair(s, hasPrivateKey) - } else if _, ok := s.Data[corev1.TLSCertKey]; ok { - return extractKubeSecretKeyPair(s, hasPrivateKey) - } else if ca, ok := s.Data[corev1.ServiceAccountRootCAKey]; ok && !hasPrivateKey { - return ca, nil, nil - } else { - return nil, nil, errors.New("unknown secret format") - } -} - -func extractApisixSecretKeyPair(s *corev1.Secret, hasPrivateKey bool) (cert []byte, key []byte, err error) { - var ok bool - cert, ok = s.Data["cert"] - if !ok { - return nil, nil, errors.New("missing cert field") - } - - if hasPrivateKey { - key, ok = s.Data["key"] - if !ok { - return nil, nil, errors.New("missing key field") - } - } - return -} - -func extractKubeSecretKeyPair(s *corev1.Secret, hasPrivateKey bool) (cert []byte, key []byte, err error) { - var ok bool - cert, ok = s.Data[corev1.TLSCertKey] - if !ok { - return nil, nil, errors.New("missing cert field") - } - - if hasPrivateKey { - key, ok = s.Data[corev1.TLSPrivateKeyKey] - if !ok { - return nil, nil, errors.New("missing key field") - } - } - return -} diff --git a/internal/provider/controlplane/translator/httproute.go b/internal/provider/controlplane/translator/httproute.go deleted file mode 100644 index 9127b09ca..000000000 --- a/internal/provider/controlplane/translator/httproute.go +++ /dev/null @@ -1,474 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package translator - -import ( - "fmt" - "strings" - - "github.com/api7/gopkg/pkg/log" - "github.com/pkg/errors" - "go.uber.org/zap" - discoveryv1 "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/types" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - v1 "github.com/apache/apisix-ingress-controller/api/dashboard/v1" - "github.com/apache/apisix-ingress-controller/internal/controller/label" - "github.com/apache/apisix-ingress-controller/internal/id" - "github.com/apache/apisix-ingress-controller/internal/provider" -) - -func (t *Translator) fillPluginsFromHTTPRouteFilters( - plugins v1.Plugins, - namespace string, - filters []gatewayv1.HTTPRouteFilter, - matches []gatewayv1.HTTPRouteMatch, - tctx *provider.TranslateContext, -) { - for _, filter := range filters { - switch filter.Type { - case gatewayv1.HTTPRouteFilterRequestHeaderModifier: - t.fillPluginFromHTTPRequestHeaderFilter(plugins, filter.RequestHeaderModifier) - case gatewayv1.HTTPRouteFilterRequestRedirect: - t.fillPluginFromHTTPRequestRedirectFilter(plugins, filter.RequestRedirect) - case gatewayv1.HTTPRouteFilterRequestMirror: - t.fillPluginFromHTTPRequestMirrorFilter(plugins, namespace, filter.RequestMirror) - case gatewayv1.HTTPRouteFilterURLRewrite: - t.fillPluginFromURLRewriteFilter(plugins, filter.URLRewrite, matches) - case gatewayv1.HTTPRouteFilterResponseHeaderModifier: - t.fillPluginFromHTTPResponseHeaderFilter(plugins, filter.ResponseHeaderModifier) - case gatewayv1.HTTPRouteFilterExtensionRef: - t.fillPluginFromExtensionRef(plugins, namespace, filter.ExtensionRef, tctx) - } - } -} - -func (t *Translator) fillPluginFromExtensionRef(plugins v1.Plugins, namespace string, extensionRef *gatewayv1.LocalObjectReference, tctx *provider.TranslateContext) { - if extensionRef == nil { - return - } - if extensionRef.Kind == "PluginConfig" { - pluginconfig := tctx.PluginConfigs[types.NamespacedName{ - Namespace: namespace, - Name: string(extensionRef.Name), - }] - for _, plugin := range pluginconfig.Spec.Plugins { - pluginName := plugin.Name - plugins[pluginName] = plugin.Config - log.Errorw("plugin config", zap.String("namespace", namespace), zap.Any("plugin_config", plugin)) - } - log.Errorw("plugin config", zap.String("namespace", namespace), zap.Any("plugins", plugins)) - } -} - -func (t *Translator) fillPluginFromURLRewriteFilter(plugins v1.Plugins, urlRewrite *gatewayv1.HTTPURLRewriteFilter, matches []gatewayv1.HTTPRouteMatch) { - pluginName := v1.PluginProxyRewrite - obj := plugins[pluginName] - var plugin *v1.RewriteConfig - if obj == nil { - plugin = &v1.RewriteConfig{} - plugins[pluginName] = plugin - } else { - plugin = obj.(*v1.RewriteConfig) - } - if urlRewrite.Hostname != nil { - plugin.Host = string(*urlRewrite.Hostname) - } - - if urlRewrite.Path != nil { - switch urlRewrite.Path.Type { - case gatewayv1.FullPathHTTPPathModifier: - plugin.RewriteTarget = *urlRewrite.Path.ReplaceFullPath - case gatewayv1.PrefixMatchHTTPPathModifier: - prefixPaths := make([]string, 0, len(matches)) - for _, match := range matches { - if match.Path == nil || match.Path.Type == nil || *match.Path.Type != gatewayv1.PathMatchPathPrefix { - continue - } - prefixPaths = append(prefixPaths, *match.Path.Value) - } - regexPattern := "^(" + strings.Join(prefixPaths, "|") + ")" + "/(.*)" - replaceTarget := *urlRewrite.Path.ReplacePrefixMatch - regexTarget := replaceTarget + "/$2" - - plugin.RewriteTargetRegex = []string{ - regexPattern, - regexTarget, - } - } - } -} - -func (t *Translator) fillPluginFromHTTPRequestHeaderFilter(plugins v1.Plugins, reqHeaderModifier *gatewayv1.HTTPHeaderFilter) { - pluginName := v1.PluginProxyRewrite - obj := plugins[pluginName] - var plugin *v1.RewriteConfig - if obj == nil { - plugin = &v1.RewriteConfig{ - Headers: &v1.Headers{ - Add: make(map[string]string, len(reqHeaderModifier.Add)), - Set: make(map[string]string, len(reqHeaderModifier.Set)), - Remove: make([]string, 0, len(reqHeaderModifier.Remove)), - }, - } - plugins[pluginName] = plugin - } else { - plugin = obj.(*v1.RewriteConfig) - } - for _, header := range reqHeaderModifier.Add { - val := plugin.Headers.Add[string(header.Name)] - if val != "" { - val += ", " + header.Value - } else { - val = header.Value - } - plugin.Headers.Add[string(header.Name)] = val - } - for _, header := range reqHeaderModifier.Set { - plugin.Headers.Set[string(header.Name)] = header.Value - } - plugin.Headers.Remove = append(plugin.Headers.Remove, reqHeaderModifier.Remove...) -} - -func (t *Translator) fillPluginFromHTTPResponseHeaderFilter(plugins v1.Plugins, respHeaderModifier *gatewayv1.HTTPHeaderFilter) { - pluginName := v1.PluginResponseRewrite - obj := plugins[pluginName] - var plugin *v1.ResponseRewriteConfig - if obj == nil { - plugin = &v1.ResponseRewriteConfig{ - Headers: &v1.ResponseHeaders{ - Add: make([]string, 0, len(respHeaderModifier.Add)), - Set: make(map[string]string, len(respHeaderModifier.Set)), - Remove: make([]string, 0, len(respHeaderModifier.Remove)), - }, - } - plugins[pluginName] = plugin - } else { - plugin = obj.(*v1.ResponseRewriteConfig) - } - for _, header := range respHeaderModifier.Add { - plugin.Headers.Add = append(plugin.Headers.Add, fmt.Sprintf("%s: %s", header.Name, header.Value)) - } - for _, header := range respHeaderModifier.Set { - plugin.Headers.Set[string(header.Name)] = header.Value - } - plugin.Headers.Remove = append(plugin.Headers.Remove, respHeaderModifier.Remove...) -} - -func (t *Translator) fillPluginFromHTTPRequestMirrorFilter(plugins v1.Plugins, namespace string, reqMirror *gatewayv1.HTTPRequestMirrorFilter) { - pluginName := v1.PluginProxyMirror - obj := plugins[pluginName] - - var plugin *v1.RequestMirror - if obj == nil { - plugin = &v1.RequestMirror{} - plugins[pluginName] = plugin - } else { - plugin = obj.(*v1.RequestMirror) - } - - var ( - port = 80 - ns = namespace - ) - if reqMirror.BackendRef.Port != nil { - port = int(*reqMirror.BackendRef.Port) - } - if reqMirror.BackendRef.Namespace != nil { - ns = string(*reqMirror.BackendRef.Namespace) - } - - host := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", reqMirror.BackendRef.Name, ns, port) - - plugin.Host = host -} - -func (t *Translator) fillPluginFromHTTPRequestRedirectFilter(plugins v1.Plugins, reqRedirect *gatewayv1.HTTPRequestRedirectFilter) { - pluginName := v1.PluginRedirect - obj := plugins[pluginName] - - var plugin *v1.RedirectConfig - if obj == nil { - plugin = &v1.RedirectConfig{} - plugins[pluginName] = plugin - } else { - plugin = obj.(*v1.RedirectConfig) - } - var uri string - - code := 302 - if reqRedirect.StatusCode != nil { - code = *reqRedirect.StatusCode - } - - hostname := "$host" - if reqRedirect.Hostname != nil { - hostname = string(*reqRedirect.Hostname) - } - - scheme := "$scheme" - if reqRedirect.Scheme != nil { - scheme = *reqRedirect.Scheme - } - - if reqRedirect.Port != nil { - uri = fmt.Sprintf("%s://%s:%d$request_uri", scheme, hostname, int(*reqRedirect.Port)) - } else { - uri = fmt.Sprintf("%s://%s$request_uri", scheme, hostname) - } - plugin.RetCode = code - plugin.URI = uri -} - -func (t *Translator) translateEndpointSlice(endpointSlices []discoveryv1.EndpointSlice) v1.UpstreamNodes { - var nodes v1.UpstreamNodes - if len(endpointSlices) == 0 { - return nodes - } - for _, endpointSlice := range endpointSlices { - for _, port := range endpointSlice.Ports { - for _, endpoint := range endpointSlice.Endpoints { - for _, addr := range endpoint.Addresses { - node := v1.UpstreamNode{ - Host: addr, - Port: int(*port.Port), - Weight: 1, - } - nodes = append(nodes, node) - } - } - } - } - - return nodes -} - -func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef) *v1.Upstream { - upstream := v1.NewDefaultUpstream() - endpointSlices := tctx.EndpointSlices[types.NamespacedName{ - Namespace: string(*ref.Namespace), - Name: string(ref.Name), - }] - - upstream.Nodes = t.translateEndpointSlice(endpointSlices) - return upstream -} - -func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRoute *gatewayv1.HTTPRoute) (*TranslateResult, error) { - result := &TranslateResult{} - - hosts := make([]string, 0, len(httpRoute.Spec.Hostnames)) - for _, hostname := range httpRoute.Spec.Hostnames { - hosts = append(hosts, string(hostname)) - } - - rules := httpRoute.Spec.Rules - - for i, rule := range rules { - - var weightedUpstreams []v1.TrafficSplitConfigRuleWeightedUpstream - upstreams := []*v1.Upstream{} - for _, backend := range rule.BackendRefs { - if backend.Namespace == nil { - namespace := gatewayv1.Namespace(httpRoute.Namespace) - backend.Namespace = &namespace - } - upstream := t.translateBackendRef(tctx, backend.BackendRef) - upstream.Labels["name"] = string(backend.Name) - upstream.Labels["namespace"] = string(*backend.Namespace) - upstreams = append(upstreams, upstream) - if len(upstream.Nodes) == 0 { - upstream.Nodes = v1.UpstreamNodes{ - { - Host: "0.0.0.0", - Port: 80, - Weight: 100, - }, - } - } - - weight := 100 - if backend.Weight != nil { - weight = int(*backend.Weight) - } - weightedUpstreams = append(weightedUpstreams, v1.TrafficSplitConfigRuleWeightedUpstream{ - Upstream: upstream, - Weight: weight, - }) - } - - if len(upstreams) == 0 { - upstream := v1.NewDefaultUpstream() - upstream.Nodes = v1.UpstreamNodes{ - { - Host: "0.0.0.0", - Port: 80, - Weight: 100, - }, - } - upstreams = append(upstreams, upstream) - } - - service := v1.NewDefaultService() - service.Upstream = upstreams[0] - if len(weightedUpstreams) > 1 { - weightedUpstreams[0].Upstream = nil - service.Plugins["traffic-split"] = &v1.TrafficSplitConfig{ - Rules: []v1.TrafficSplitConfigRule{ - { - WeightedUpstreams: weightedUpstreams, - }, - }, - } - } - - service.Name = v1.ComposeServiceNameWithRule(httpRoute.Namespace, httpRoute.Name, fmt.Sprintf("%d", i)) - service.ID = id.GenID(service.Name) - service.Labels = label.GenLabel(httpRoute) - service.Hosts = hosts - t.fillPluginsFromHTTPRouteFilters(service.Plugins, httpRoute.GetNamespace(), rule.Filters, rule.Matches, tctx) - - result.Services = append(result.Services, service) - - matches := rule.Matches - if len(matches) == 0 { - defaultType := gatewayv1.PathMatchPathPrefix - defaultValue := "/" - matches = []gatewayv1.HTTPRouteMatch{ - { - Path: &gatewayv1.HTTPPathMatch{ - Type: &defaultType, - Value: &defaultValue, - }, - }, - } - } - - for j, match := range matches { - route, err := t.translateGatewayHTTPRouteMatch(&match) - if err != nil { - return nil, err - } - - name := v1.ComposeRouteName(httpRoute.Namespace, httpRoute.Name, fmt.Sprintf("%d-%d", i, j)) - route.Name = name - route.ID = id.GenID(name) - route.Labels = label.GenLabel(httpRoute) - route.ServiceID = service.ID - result.Routes = append(result.Routes, route) - } - } - - return result, nil -} - -func (t *Translator) translateGatewayHTTPRouteMatch(match *gatewayv1.HTTPRouteMatch) (*v1.Route, error) { - route := v1.NewDefaultRoute() - - if match.Path != nil { - switch *match.Path.Type { - case gatewayv1.PathMatchExact: - route.Paths = []string{*match.Path.Value} - case gatewayv1.PathMatchPathPrefix: - route.Paths = []string{*match.Path.Value + "*"} - case gatewayv1.PathMatchRegularExpression: - var this []v1.StringOrSlice - this = append(this, v1.StringOrSlice{ - StrVal: "uri", - }) - this = append(this, v1.StringOrSlice{ - StrVal: "~~", - }) - this = append(this, v1.StringOrSlice{ - StrVal: *match.Path.Value, - }) - - route.Vars = append(route.Vars, this) - default: - return nil, errors.New("unknown path match type " + string(*match.Path.Type)) - } - } - - if len(match.Headers) > 0 { - for _, header := range match.Headers { - name := strings.ToLower(string(header.Name)) - name = strings.ReplaceAll(name, "-", "_") - - var this []v1.StringOrSlice - this = append(this, v1.StringOrSlice{ - StrVal: "http_" + name, - }) - - switch *header.Type { - case gatewayv1.HeaderMatchExact: - this = append(this, v1.StringOrSlice{ - StrVal: "==", - }) - case gatewayv1.HeaderMatchRegularExpression: - this = append(this, v1.StringOrSlice{ - StrVal: "~~", - }) - default: - return nil, errors.New("unknown header match type " + string(*header.Type)) - } - - this = append(this, v1.StringOrSlice{ - StrVal: header.Value, - }) - - route.Vars = append(route.Vars, this) - } - } - - if len(match.QueryParams) > 0 { - for _, query := range match.QueryParams { - var this []v1.StringOrSlice - this = append(this, v1.StringOrSlice{ - StrVal: "arg_" + strings.ToLower(fmt.Sprintf("%v", query.Name)), - }) - - switch *query.Type { - case gatewayv1.QueryParamMatchExact: - this = append(this, v1.StringOrSlice{ - StrVal: "==", - }) - case gatewayv1.QueryParamMatchRegularExpression: - this = append(this, v1.StringOrSlice{ - StrVal: "~~", - }) - default: - return nil, errors.New("unknown query match type " + string(*query.Type)) - } - - this = append(this, v1.StringOrSlice{ - StrVal: query.Value, - }) - - route.Vars = append(route.Vars, this) - } - } - - if match.Method != nil { - route.Methods = []string{ - string(*match.Method), - } - } - - return route, nil -} diff --git a/internal/provider/controlplane/translator/translator.go b/internal/provider/controlplane/translator/translator.go deleted file mode 100644 index 8a817ccda..000000000 --- a/internal/provider/controlplane/translator/translator.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package translator - -import ( - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - discoveryv1 "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/types" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - v1 "github.com/apache/apisix-ingress-controller/api/dashboard/v1" - "github.com/apache/apisix-ingress-controller/api/v1alpha1" -) - -type Translator struct { - Log logr.Logger -} - -type TranslateContext struct { - BackendRefs []gatewayv1.BackendRef - GatewayTLSConfig []gatewayv1.GatewayTLSConfig - EndpointSlices map[types.NamespacedName][]discoveryv1.EndpointSlice - Secrets map[types.NamespacedName]*corev1.Secret - PluginConfigs map[types.NamespacedName]*v1alpha1.PluginConfig -} - -type TranslateResult struct { - Routes []*v1.Route - Services []*v1.Service - SSL []*v1.Ssl -} - -func NewDefaultTranslateContext() *TranslateContext { - return &TranslateContext{ - EndpointSlices: make(map[types.NamespacedName][]discoveryv1.EndpointSlice), - Secrets: make(map[types.NamespacedName]*corev1.Secret), - PluginConfigs: make(map[types.NamespacedName]*v1alpha1.PluginConfig), - } -} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index b06899fbe..d9b6cbfa4 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -37,6 +37,7 @@ type Provider interface { Delete(context.Context, client.Object) error Sync(context.Context) error Start(context.Context) error + NeedLeaderElection() bool } type TranslateContext struct { diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 63bc9190c..d83158fe0 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -19,7 +19,8 @@ package types import ( corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/networking/v1" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/runtime/schema" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" @@ -57,9 +58,9 @@ func KindOf(obj any) string { return KindHTTPRoute case *gatewayv1.GatewayClass: return KindGatewayClass - case *v1.Ingress: + case *netv1.Ingress: return KindIngress - case *v1.IngressClass: + case *netv1.IngressClass: return KindIngressClass case *corev1.Secret: return KindSecret @@ -91,3 +92,77 @@ func KindOf(obj any) string { return "Unknown" } } + +func GvkOf(obj any) schema.GroupVersionKind { + kind := KindOf(obj) + switch obj.(type) { + case *gatewayv1.Gateway, *gatewayv1.HTTPRoute, *gatewayv1.GatewayClass: + return gatewayv1.SchemeGroupVersion.WithKind(kind) + case *netv1.Ingress, *netv1.IngressClass: + return netv1.SchemeGroupVersion.WithKind(kind) + case *corev1.Secret, *corev1.Service: + return corev1.SchemeGroupVersion.WithKind(kind) + case *v2.ApisixRoute: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixRoute, + } + case *v2.ApisixGlobalRule: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixGlobalRule, + } + case *v2.ApisixPluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixPluginConfig, + } + case *v2.ApisixTls: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixTls, + } + case *v2.ApisixConsumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v2", + Kind: KindApisixConsumer, + } + case *v1alpha1.HTTPRoutePolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindHTTPRoutePolicy, + } + case *v1alpha1.BackendTrafficPolicy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindBackendTrafficPolicy, + } + case *v1alpha1.GatewayProxy: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindGatewayProxy, + } + case *v1alpha1.Consumer: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindConsumer, + } + case *v1alpha1.PluginConfig: + return schema.GroupVersionKind{ + Group: "apisix.apache.org", + Version: "v1alpha1", + Kind: KindPluginConfig, + } + default: + return schema.GroupVersionKind{} + } +} diff --git a/test/e2e/crds/v1alpha1/consumer.go b/test/e2e/crds/v1alpha1/consumer.go index d2db2a5b7..bb8f92d39 100644 --- a/test/e2e/crds/v1alpha1/consumer.go +++ b/test/e2e/crds/v1alpha1/consumer.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/apache/apisix-ingress-controller/internal/provider/adc" "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" ) @@ -579,4 +580,93 @@ spec: }) }) }) + + Context("Test Consumer sync during startup", func() { + var consumer1 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-sample +spec: + gatewayRef: + name: apisix + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key +` + var consumer2 = ` +apiVersion: apisix.apache.org/v1alpha1 +kind: Consumer +metadata: + name: consumer-unused +spec: + gatewayRef: + name: apisix-non-existent + credentials: + - type: key-auth + name: key-auth-sample + config: + key: sample-key2 +` + + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayProxy, defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) + + It("Should sync Consumer during startup", func() { + if s.Deployer.Name() == adc.BackendModeAPI7EE { + Skip("don't need to run on api7ee mode") + } + Expect(s.CreateResourceFromString(consumer2)).NotTo(HaveOccurred(), "creating unused consumer") + s.ResourceApplied("Consumer", "consumer-sample", consumer1, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + + By("restarting the controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key", + }, + Check: scaffold.WithExpectedStatus(200), + }) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin.org", + Headers: map[string]string{ + "apikey": "sample-key2", + }, + Check: scaffold.WithExpectedStatus(401), + }) + }) + }) }) diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index 70b6191ac..0eefa518e 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/provider/adc" "github.com/apache/apisix-ingress-controller/test/e2e/framework" "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" ) @@ -653,4 +654,114 @@ spec: Expect(err).ShouldNot(HaveOccurred(), "check apisixupstream is referenced") }) }) + + Context("Test ApisixRoute sync during startup", func() { + const route = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + hosts: + - httpbin + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + + const route2 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route2 +spec: + ingressClassName: apisix-nonexistent + http: + - name: rule0 + match: + hosts: + - httpbin2 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + const route3 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: route3 +spec: + http: + - name: rule0 + match: + hosts: + - httpbin3 + paths: + - /get + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 +` + It("Should sync ApisixRoute during startup", func() { + if s.Deployer.Name() == adc.BackendModeAPI7EE { + Skip("don't need to run on api7ee mode") + } + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "apply ApisixRoute with nonexistent ingressClassName") + Expect(s.CreateResourceFromString(route3)).ShouldNot(HaveOccurred(), "apply ApisixRoute without ingressClassName") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apiv2.ApisixRoute{}, route) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin3", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + }) }) diff --git a/test/e2e/gatewayapi/httproute.go b/test/e2e/gatewayapi/httproute.go index 11e990a96..42f06844b 100644 --- a/test/e2e/gatewayapi/httproute.go +++ b/test/e2e/gatewayapi/httproute.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/api/v1alpha1" + "github.com/apache/apisix-ingress-controller/internal/provider/adc" "github.com/apache/apisix-ingress-controller/test/e2e/framework" "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" ) @@ -1919,4 +1920,88 @@ spec: } }) }) + + Context("Test HTTPRoute sync during startup", func() { + BeforeEach(beforeEachHTTP) + var route = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: apisix + hostnames: + - httpbin + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + + var route2 = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin2 +spec: + parentRefs: + - name: apisix-nonexistent + hostnames: + - httpbin2 + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + It("Should sync ApisixRoute during startup", func() { + if s.Deployer.Name() == adc.BackendModeAPI7EE { + Skip("don't need to run on api7ee mode") + } + By("apply ApisixRoute") + Expect(s.CreateResourceFromString(route2)).ShouldNot(HaveOccurred(), "applying HTTPRoute with non-existent parent") + s.ResourceApplied("HTTPRoute", "httpbin", route, 1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + + By("restart controller and dataplane") + s.Deployer.ScaleIngress(0) + s.Deployer.ScaleDataplane(0) + s.Deployer.ScaleDataplane(1) + s.Deployer.ScaleIngress(1) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "httpbin2", + Check: scaffold.WithExpectedStatus(http.StatusNotFound), + }) + }) + + }) })