diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index bf390f0b0..99fd916ca 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -75,49 +75,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&v1alpha1.BackendTrafficPolicy{}, handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesForBackendTrafficPolicy), builder.WithPredicates( - predicate.Funcs{ - GenericFunc: func(e event.GenericEvent) bool { - return false - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return true - }, - CreateFunc: func(e event.CreateEvent) bool { - return true - }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldObj, ok := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy) - newObj, ok2 := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy) - if !ok || !ok2 { - return false - } - oldRefs := oldObj.Spec.TargetRefs - newRefs := newObj.Spec.TargetRefs - - oldRefMap := make(map[string]v1alpha1.BackendPolicyTargetReferenceWithSectionName) - for _, ref := range oldRefs { - key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) - oldRefMap[key] = ref - } - - for _, ref := range newRefs { - key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) - delete(oldRefMap, key) - } - if len(oldRefMap) > 0 { - targetRefs := make([]v1alpha1.BackendPolicyTargetReferenceWithSectionName, 0, len(oldRefs)) - for _, ref := range oldRefMap { - targetRefs = append(targetRefs, ref) - } - dump := oldObj.DeepCopy() - dump.Spec.TargetRefs = targetRefs - r.genericEvent <- event.GenericEvent{ - Object: dump, - } - } - return true - }, - }, + BackendTrafficPolicyPredicateFunc(r.genericEvent), ), ). Watches(&v1alpha1.HTTPRoutePolicy{}, @@ -434,32 +392,7 @@ func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, switch v := obj.(type) { case *v1alpha1.BackendTrafficPolicy: - httprouteAll := []gatewayv1.HTTPRoute{} - for _, ref := range v.Spec.TargetRefs { - httprouteList := &gatewayv1.HTTPRouteList{} - if err := r.List(ctx, httprouteList, client.MatchingFields{ - indexer.ServiceIndexRef: indexer.GenIndexKey(v.GetNamespace(), string(ref.Name)), - }); err != nil { - r.Log.Error(err, "failed to list HTTPRoutes for BackendTrafficPolicy", "namespace", v.GetNamespace(), "ref", ref.Name) - return nil - } - httprouteAll = append(httprouteAll, httprouteList.Items...) - } - for _, hr := range httprouteAll { - key := types.NamespacedName{ - Namespace: hr.Namespace, - Name: hr.Name, - } - if _, ok := namespacedNameMap[key]; !ok { - namespacedNameMap[key] = struct{}{} - requests = append(requests, reconcile.Request{ - NamespacedName: client.ObjectKey{ - Namespace: hr.Namespace, - Name: hr.Name, - }, - }) - } - } + requests = r.listHTTPRoutesForBackendTrafficPolicy(ctx, v) case *v1alpha1.HTTPRoutePolicy: for _, ref := range v.Spec.TargetRefs { namespacedName := types.NamespacedName{Namespace: v.GetNamespace(), Name: string(ref.Name)} diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 325529694..68e1e44ec 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -19,12 +19,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" ) // IngressReconciler reconciles a Ingress object. @@ -33,11 +37,14 @@ type IngressReconciler struct { //nolint:revive Scheme *runtime.Scheme Log logr.Logger - Provider provider.Provider + Provider provider.Provider + genericEvent chan event.GenericEvent } // SetupWithManager sets up the controller with the Manager. func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + return ctrl.NewControllerManagedBy(mgr). For(&networkingv1.Ingress{}, builder.WithPredicates( @@ -65,6 +72,18 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { &corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.listIngressesBySecret), ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listIngressForBackendTrafficPolicy), + builder.WithPredicates( + BackendTrafficPolicyPredicateFunc(r.genericEvent), + ), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listIngressForGenericEvent), + ), + ). Complete(r) } @@ -103,6 +122,12 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } + tctx.RouteParentRefs = append(tctx.RouteParentRefs, gatewayv1.ParentReference{ + Group: ptr.To(gatewayv1.Group(ingressClass.GroupVersionKind().Group)), + Kind: ptr.To(gatewayv1.Kind("IngressClass")), + Name: gatewayv1.ObjectName(ingressClass.Name), + }) + // process IngressClass parameters if they reference GatewayProxy if err := r.processIngressClassParameters(ctx, tctx, ingress, ingressClass); err != nil { r.Log.Error(err, "failed to process IngressClass parameters", "ingressClass", ingressClass.Name) @@ -121,12 +146,17 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) + // update the ingress resources if err := r.Provider.Update(ctx, tctx, ingress); err != nil { r.Log.Error(err, "failed to update ingress resources", "ingress", ingress.Name) return ctrl.Result{}, err } + // update the status of related resources + UpdateStatus(r.Client, r.Log, tctx) + // update the ingress status if err := r.updateStatus(ctx, tctx, ingress, ingressClass); err != nil { r.Log.Error(err, "failed to update ingress status", "ingress", ingress.Name) @@ -341,6 +371,59 @@ func (r *IngressReconciler) listIngressesBySecret(ctx context.Context, obj clien return requests } +func (r *IngressReconciler) listIngressForBackendTrafficPolicy(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + v, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + var namespacedNameMap = make(map[types.NamespacedName]struct{}) + ingresses := []networkingv1.Ingress{} + for _, ref := range v.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: v.Namespace, + Name: string(ref.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", v.Namespace, "name", ref.Name) + } + continue + } + ingressList := &networkingv1.IngressList{} + if err := r.List(ctx, ingressList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(v.GetNamespace(), string(ref.Name)), + }); err != nil { + r.Log.Error(err, "failed to list HTTPRoutes for BackendTrafficPolicy", "namespace", v.GetNamespace(), "ref", ref.Name) + return nil + } + ingresses = append(ingresses, ingressList.Items...) + } + for _, ins := range ingresses { + key := types.NamespacedName{ + Namespace: ins.Namespace, + Name: ins.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: key, + }) + } + } + return requests +} + +func (r *IngressReconciler) listIngressForGenericEvent(ctx context.Context, obj client.Object) (requests []reconcile.Request) { + switch v := obj.(type) { + case *v1alpha1.BackendTrafficPolicy: + requests = r.listIngressForBackendTrafficPolicy(ctx, v) + default: + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + } + return requests +} + // processTLS process the TLS configuration of the ingress func (r *IngressReconciler) processTLS(tctx *provider.TranslateContext, ingress *networkingv1.Ingress) error { for _, tls := range ingress.Spec.TLS { diff --git a/internal/controller/policies.go b/internal/controller/policies.go index ac6de8317..2ef04a3b4 100644 --- a/internal/controller/policies.go +++ b/internal/controller/policies.go @@ -14,6 +14,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/api7/api7-ingress-controller/api/v1alpha1" "github.com/api7/api7-ingress-controller/internal/controller/config" @@ -30,6 +32,52 @@ func (p PolicyTargetKey) String() string { return p.NsName.String() + "/" + p.GroupKind.String() } +func BackendTrafficPolicyPredicateFunc(channel chan event.GenericEvent) predicate.Predicate { + return predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy) + newObj, ok2 := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy) + if !ok || !ok2 { + return false + } + oldRefs := oldObj.Spec.TargetRefs + newRefs := newObj.Spec.TargetRefs + + oldRefMap := make(map[string]v1alpha1.BackendPolicyTargetReferenceWithSectionName) + for _, ref := range oldRefs { + key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) + oldRefMap[key] = ref + } + + for _, ref := range newRefs { + key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) + delete(oldRefMap, key) + } + if len(oldRefMap) > 0 { + targetRefs := make([]v1alpha1.BackendPolicyTargetReferenceWithSectionName, 0, len(oldRefs)) + for _, ref := range oldRefMap { + targetRefs = append(targetRefs, ref) + } + dump := oldObj.DeepCopy() + dump.Spec.TargetRefs = targetRefs + channel <- event.GenericEvent{ + Object: dump, + } + } + return true + }, + } +} + func ProcessBackendTrafficPolicy( c client.Client, log logr.Logger, @@ -131,10 +179,7 @@ func SetAncestorStatus(status *v1alpha1.PolicyStatus, ancestorStatus gatewayv1al } condition := ancestorStatus.Conditions[0] for _, c := range status.Ancestors { - if c.AncestorRef.Name == ancestorStatus.AncestorRef.Name && - ptr.Equal(c.AncestorRef.Namespace, ancestorStatus.AncestorRef.Namespace) && - ptr.Equal(c.AncestorRef.Group, ancestorStatus.AncestorRef.Group) && - ptr.Equal(c.AncestorRef.Kind, ancestorStatus.AncestorRef.Kind) && + if parentRefValueEqual(ancestorStatus.AncestorRef, c.AncestorRef) && c.ControllerName == ancestorStatus.ControllerName { if !VerifyConditions(&c.Conditions, condition) { return false diff --git a/internal/provider/adc/translator/ingress.go b/internal/provider/adc/translator/ingress.go index fa0ca853c..72a263f51 100644 --- a/internal/provider/adc/translator/ingress.go +++ b/internal/provider/adc/translator/ingress.go @@ -4,14 +4,15 @@ import ( "fmt" "strings" - adctypes "github.com/api7/api7-ingress-controller/api/adc" - "github.com/api7/api7-ingress-controller/internal/controller/label" - "github.com/api7/api7-ingress-controller/internal/id" - "github.com/api7/api7-ingress-controller/internal/provider" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/types" + + adctypes "github.com/api7/api7-ingress-controller/api/adc" + "github.com/api7/api7-ingress-controller/internal/controller/label" + "github.com/api7/api7-ingress-controller/internal/id" + "github.com/api7/api7-ingress-controller/internal/provider" ) func (t *Translator) translateIngressTLS(ingressTLS *networkingv1.IngressTLS, secret *corev1.Secret, labels map[string]string) (*adctypes.SSL, error) { @@ -104,12 +105,15 @@ func (t *Translator) TranslateIngress(tctx *provider.TranslateContext, obj *netw // get the EndpointSlice of the backend service backendService := path.Backend.Service - endpointSlices := tctx.EndpointSlices[types.NamespacedName{ - Namespace: obj.Namespace, - Name: backendService.Name, - }] - - t.attachBackendTrafficPolicyToUpstream(nil, upstream) + var endpointSlices []discoveryv1.EndpointSlice + if backendService != nil { + endpointSlices = tctx.EndpointSlices[types.NamespacedName{ + Namespace: obj.Namespace, + Name: backendService.Name, + }] + backendRef := convertBackendRef(obj.Namespace, backendService.Name, "Service") + t.AttachBackendTrafficPolicyToUpstream(backendRef, tctx.BackendTrafficPolicies, upstream) + } // get the service port configuration var servicePort int32 = 0 diff --git a/internal/provider/adc/translator/policies.go b/internal/provider/adc/translator/policies.go index a71e76bd1..20f0202e2 100644 --- a/internal/provider/adc/translator/policies.go +++ b/internal/provider/adc/translator/policies.go @@ -4,20 +4,31 @@ import ( "github.com/api7/api7-ingress-controller/api/adc" "github.com/api7/api7-ingress-controller/api/v1alpha1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" adctypes "github.com/api7/api7-ingress-controller/api/adc" ) +func convertBackendRef(namespace, name, kind string) gatewayv1.BackendRef { + backendRef := gatewayv1.BackendRef{} + backendRef.Name = gatewayv1.ObjectName(name) + backendRef.Namespace = ptr.To(gatewayv1.Namespace(namespace)) + backendRef.Kind = ptr.To(gatewayv1.Kind(kind)) + return backendRef +} + func (t *Translator) AttachBackendTrafficPolicyToUpstream(ref gatewayv1.BackendRef, policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { if len(policies) == 0 { return } var policy *v1alpha1.BackendTrafficPolicy for _, po := range policies { + if ref.Namespace != nil && string(*ref.Namespace) != po.Namespace { + continue + } for _, targetRef := range po.Spec.TargetRefs { - if ref.Name == targetRef.Name && - (ref.Namespace != nil && string(*ref.Namespace) == po.Namespace) { + if ref.Name == targetRef.Name { policy = po break } diff --git a/test/e2e/crds/backendtrafficpolicy.go b/test/e2e/crds/backendtrafficpolicy.go index 90c82bf07..8a2ad0407 100644 --- a/test/e2e/crds/backendtrafficpolicy.go +++ b/test/e2e/crds/backendtrafficpolicy.go @@ -1,15 +1,17 @@ package gatewayapi import ( + "fmt" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/api7/api7-ingress-controller/test/e2e/framework" "github.com/api7/api7-ingress-controller/test/e2e/scaffold" ) -var _ = Describe("Test BackendTrafficPolicy", func() { +var _ = Describe("Test BackendTrafficPolicy base on HTTPRoute", func() { s := scaffold.NewDefaultScaffold() var defaultGatewayProxy = ` @@ -142,3 +144,139 @@ spec: }) }) }) + +var _ = Describe("Test BackendTrafficPolicy base on Ingress", func() { + s := scaffold.NewScaffold(&scaffold.Options{ + ControllerName: "gateway.api7.io/api7-ingress-controller", + }) + + var defaultGatewayProxy = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: GatewayProxy +metadata: + name: api7-proxy-config + namespace: default +spec: + provider: + type: ControlPlane + controlPlane: + endpoints: + - %s + auth: + type: AdminKey + adminKey: + value: "%s" +` + var defaultIngressClass = ` +apiVersion: networking.k8s.io/v1 +kind: IngressClass +metadata: + name: api7-default + annotations: + ingressclass.kubernetes.io/is-default-class: "true" +spec: + controller: "gateway.api7.io/api7-ingress-controller" + parameters: + apiGroup: "gateway.apisix.io" + kind: "GatewayProxy" + name: "api7-proxy-config" + namespace: "default" + scope: "Namespace" +` + + var defaultIngress = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: api7-ingress-default +spec: + rules: + - host: httpbin.org + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: httpbin-service-e2e-test + port: + number: 80 +` + var beforeEach = func() { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(defaultGatewayProxy, framework.DashboardTLSEndpoint, s.AdminKey()) + err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + + By("create IngressClass with GatewayProxy reference") + err = s.CreateResourceFromStringWithNamespace(defaultIngressClass, "") + Expect(err).NotTo(HaveOccurred(), "creating IngressClass with GatewayProxy") + + By("create Ingress with GatewayProxy IngressClass") + err = s.CreateResourceFromString(defaultIngress) + Expect(err).NotTo(HaveOccurred(), "creating Ingress with GatewayProxy IngressClass") + time.Sleep(5 * time.Second) + } + + Context("Rewrite Upstream Host", func() { + var createUpstreamHost = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: httpbin +spec: + targetRefs: + - name: httpbin-service-e2e-test + kind: Service + group: "" + passHost: rewrite + upstreamHost: httpbin.example.com +` + + var updateUpstreamHost = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: httpbin +spec: + targetRefs: + - name: httpbin-service-e2e-test + kind: Service + group: "" + passHost: rewrite + upstreamHost: httpbin.update.example.com +` + + BeforeEach(beforeEach) + It("should rewrite upstream host", func() { + s.ResourceApplied("BackendTrafficPolicy", "httpbin", createUpstreamHost, 1) + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body().Contains("httpbin.example.com") + + s.ResourceApplied("BackendTrafficPolicy", "httpbin", updateUpstreamHost, 2) + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body().Contains("httpbin.update.example.com") + + err := s.DeleteResourceFromString(createUpstreamHost) + Expect(err).NotTo(HaveOccurred(), "deleting BackendTrafficPolicy") + time.Sleep(5 * time.Second) + + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body(). + NotContains("httpbin.update.example.com"). + NotContains("httpbin.example.com") + }) + }) +})