From 6066fb7bb011b216171fe077858a2ee9bc87f887 Mon Sep 17 00:00:00 2001 From: rongxin Date: Tue, 15 Apr 2025 11:02:21 +0800 Subject: [PATCH 01/11] merge --- api/adc/types.go | 6 +- api/v1alpha1/backendtrafficpolicy_types.go | 8 +- api/v1alpha1/policies_type.go | 8 ++ api/v1alpha1/zz_generated.deepcopy.go | 46 ++++++- ...eway.apisix.io_backendtrafficpolicies.yaml | 2 + internal/controller/consumer_controller.go | 2 +- internal/controller/httproute_controller.go | 10 +- internal/controller/indexer/indexer.go | 31 +++++ internal/controller/ingress_controller.go | 20 +-- internal/controller/policies.go | 119 ++++++++++++++++++ internal/controller/status.go | 44 +++++++ internal/provider/adc/translator/httproute.go | 10 +- internal/provider/adc/translator/ingress.go | 2 + internal/provider/adc/translator/policies.go | 60 +++++++++ internal/provider/provider.go | 33 +++-- 15 files changed, 356 insertions(+), 45 deletions(-) create mode 100644 api/v1alpha1/policies_type.go create mode 100644 internal/controller/policies.go create mode 100644 internal/provider/adc/translator/policies.go diff --git a/api/adc/types.go b/api/adc/types.go index 2e510b7f4..2ad73a998 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -128,9 +128,9 @@ type Route struct { } type Timeout struct { - Connect float64 `json:"connect"` - Read float64 `json:"read"` - Send float64 `json:"send"` + Connect *int64 `json:"connect,omitempty"` + Read *int64 `json:"read,omitempty"` + Send *int64 `json:"send,omitempty"` } type StreamRoute struct { diff --git a/api/v1alpha1/backendtrafficpolicy_types.go b/api/v1alpha1/backendtrafficpolicy_types.go index 2bdf5b553..3cf4ad767 100644 --- a/api/v1alpha1/backendtrafficpolicy_types.go +++ b/api/v1alpha1/backendtrafficpolicy_types.go @@ -2,8 +2,6 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" ) // +kubebuilder:object:root=true @@ -12,8 +10,8 @@ type BackendTrafficPolicy struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec BackendTrafficPolicySpec `json:"spec,omitempty"` - Status gatewayv1alpha2.PolicyStatus `json:"status,omitempty"` + Spec BackendTrafficPolicySpec `json:"spec,omitempty"` + Status PolicyStatus `json:"status,omitempty"` } type BackendTrafficPolicySpec struct { @@ -27,7 +25,7 @@ type BackendTrafficPolicySpec struct { // +listMapKey=name // +kubebuilder:validation:MinItems=1 // +kubebuilder:validation:MaxItems=16 - TargetRefs []gatewayv1alpha2.LocalPolicyTargetReferenceWithSectionName `json:"targetRefs"` + TargetRefs []BackendPolicyTargetReferenceWithSectionName `json:"targetRefs"` // LoadBalancer represents the load balancer configuration for Kubernetes Service. // The default strategy is round robin. LoadBalancer *LoadBalancer `json:"loadbalancer,omitempty" yaml:"loadbalancer,omitempty"` diff --git a/api/v1alpha1/policies_type.go b/api/v1alpha1/policies_type.go new file mode 100644 index 000000000..527226e4f --- /dev/null +++ b/api/v1alpha1/policies_type.go @@ -0,0 +1,8 @@ +package v1alpha1 + +import gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + +type PolicyStatus gatewayv1alpha2.PolicyStatus + +// +kubebuilder:validation:XValidation:rule="self.kind == 'Service' && self.group == \"\"" +type BackendPolicyTargetReferenceWithSectionName gatewayv1alpha2.LocalPolicyTargetReferenceWithSectionName diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 04ff10e12..8f4a5b117 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,7 @@ import ( "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + apisv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/gateway-api/apis/v1alpha2" ) @@ -68,6 +69,27 @@ func (in *AdminKeyValueFrom) DeepCopy() *AdminKeyValueFrom { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BackendPolicyTargetReferenceWithSectionName) DeepCopyInto(out *BackendPolicyTargetReferenceWithSectionName) { + *out = *in + out.LocalPolicyTargetReference = in.LocalPolicyTargetReference + if in.SectionName != nil { + in, out := &in.SectionName, &out.SectionName + *out = new(apisv1.SectionName) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackendPolicyTargetReferenceWithSectionName. +func (in *BackendPolicyTargetReferenceWithSectionName) DeepCopy() *BackendPolicyTargetReferenceWithSectionName { + if in == nil { + return nil + } + out := new(BackendPolicyTargetReferenceWithSectionName) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BackendTrafficPolicy) DeepCopyInto(out *BackendTrafficPolicy) { *out = *in @@ -132,7 +154,7 @@ func (in *BackendTrafficPolicySpec) DeepCopyInto(out *BackendTrafficPolicySpec) *out = *in if in.TargetRefs != nil { in, out := &in.TargetRefs, &out.TargetRefs - *out = make([]v1alpha2.LocalPolicyTargetReferenceWithSectionName, len(*in)) + *out = make([]BackendPolicyTargetReferenceWithSectionName, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -688,6 +710,28 @@ func (in *PluginConfigSpec) DeepCopy() *PluginConfigSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PolicyStatus) DeepCopyInto(out *PolicyStatus) { + *out = *in + if in.Ancestors != nil { + in, out := &in.Ancestors, &out.Ancestors + *out = make([]v1alpha2.PolicyAncestorStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyStatus. +func (in *PolicyStatus) DeepCopy() *PolicyStatus { + if in == nil { + return nil + } + out := new(PolicyStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SecretKeySelector) DeepCopyInto(out *SecretKeySelector) { *out = *in diff --git a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml index 3e325bf7f..2d3130080 100644 --- a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml +++ b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml @@ -153,6 +153,8 @@ spec: - kind - name type: object + x-kubernetes-validations: + - rule: self.kind == 'Service' && self.group == "" maxItems: 16 minItems: 1 type: array diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 1d5054aca..2ef6e30e2 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -136,7 +136,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } var statusErr error - tctx := provider.NewDefaultTranslateContext() + tctx := provider.NewDefaultTranslateContext(ctx) if err := r.processSpec(ctx, tctx, consumer); err != nil { r.Log.Error(err, "failed to process consumer spec", "consumer", consumer) diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index afe21a884..bc33dd79b 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -112,7 +112,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } - tctx := provider.NewDefaultTranslateContext() + tctx := provider.NewDefaultTranslateContext(ctx) if err := r.processHTTPRoute(tctx, hr); err != nil { acceptStatus.status = false @@ -249,7 +249,7 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla } var service corev1.Service - if err := r.Get(context.TODO(), client.ObjectKey{ + if err := r.Get(tctx, client.ObjectKey{ Namespace: namespace, Name: name, }, &service); err != nil { @@ -268,9 +268,13 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla terr = fmt.Errorf("port %d not found in service %s", *backend.Port, name) continue } + tctx.Services[client.ObjectKey{ + Namespace: namespace, + Name: name, + }] = &service endpointSliceList := new(discoveryv1.EndpointSliceList) - if err := r.List(context.TODO(), endpointSliceList, + if err := r.List(tctx, endpointSliceList, client.InNamespace(namespace), client.MatchingLabels{ discoveryv1.LabelServiceName: name, diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 5b20ea121..f07024dd4 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -5,6 +5,8 @@ import ( "github.com/api7/api7-ingress-controller/api/v1alpha1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -19,6 +21,7 @@ const ( SecretIndexRef = "secretRefs" IngressClassRef = "ingressClassRef" ConsumerGatewayRef = "consumerGatewayRef" + PolicyTargetRefs = "targetRefs" ) func SetupIndexer(mgr ctrl.Manager) error { @@ -227,6 +230,18 @@ func IngressSecretIndexFunc(rawObj client.Object) []string { return secrets } +func GenIndexKeyWithGK(group, kind, namespace, name string) string { + gvk := schema.GroupKind{ + Group: group, + Kind: kind, + } + nsName := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + return gvk.String() + "/" + nsName.String() +} + func GenIndexKey(namespace, name string) string { return client.ObjectKey{ Namespace: namespace, @@ -292,3 +307,19 @@ func GatewayParametersRefIndexFunc(rawObj client.Object) []string { } return nil } + +func BackendTrafficPolicyIndexFunc(rawObj client.Object) []string { + btp := rawObj.(*v1alpha1.BackendTrafficPolicy) + keys := make([]string, 0, len(btp.Spec.TargetRefs)) + for _, ref := range btp.Spec.TargetRefs { + keys = append(keys, + GenIndexKeyWithGK( + v1alpha1.GroupVersion.Group, + string(ref.Kind), + btp.GetNamespace(), + string(ref.Name), + ), + ) + } + return keys +} diff --git a/internal/controller/ingress_controller.go b/internal/controller/ingress_controller.go index 2e1636eed..ac22a0ea7 100644 --- a/internal/controller/ingress_controller.go +++ b/internal/controller/ingress_controller.go @@ -93,16 +93,16 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct r.Log.Info("reconciling ingress", "ingress", ingress.Name) // create a translate context - tctx := provider.NewDefaultTranslateContext() + tctx := provider.NewDefaultTranslateContext(ctx) // process TLS configuration - if err := r.processTLS(ctx, tctx, ingress); err != nil { + if err := r.processTLS(tctx, ingress); err != nil { r.Log.Error(err, "failed to process TLS configuration", "ingress", ingress.Name) return ctrl.Result{}, err } // process backend services - if err := r.processBackends(ctx, tctx, ingress); err != nil { + if err := r.processBackends(tctx, ingress); err != nil { r.Log.Error(err, "failed to process backend services", "ingress", ingress.Name) return ctrl.Result{}, err } @@ -295,14 +295,14 @@ func (r *IngressReconciler) listIngressesBySecret(ctx context.Context, obj clien } // processTLS process the TLS configuration of the ingress -func (r *IngressReconciler) processTLS(ctx context.Context, tctx *provider.TranslateContext, ingress *networkingv1.Ingress) error { +func (r *IngressReconciler) processTLS(tctx *provider.TranslateContext, ingress *networkingv1.Ingress) error { for _, tls := range ingress.Spec.TLS { if tls.SecretName == "" { continue } secret := corev1.Secret{} - if err := r.Get(ctx, client.ObjectKey{ + if err := r.Get(tctx, client.ObjectKey{ Namespace: ingress.Namespace, Name: tls.SecretName, }, &secret); err != nil { @@ -323,7 +323,7 @@ func (r *IngressReconciler) processTLS(ctx context.Context, tctx *provider.Trans } // processBackends process the backend services of the ingress -func (r *IngressReconciler) processBackends(ctx context.Context, tctx *provider.TranslateContext, ingress *networkingv1.Ingress) error { +func (r *IngressReconciler) processBackends(tctx *provider.TranslateContext, ingress *networkingv1.Ingress) error { var terr error // process all the backend services in the rules @@ -336,7 +336,7 @@ func (r *IngressReconciler) processBackends(ctx context.Context, tctx *provider. continue } service := path.Backend.Service - if err := r.processBackendService(ctx, tctx, ingress.Namespace, service); err != nil { + if err := r.processBackendService(tctx, ingress.Namespace, service); err != nil { terr = err } } @@ -345,10 +345,10 @@ func (r *IngressReconciler) processBackends(ctx context.Context, tctx *provider. } // processBackendService process a single backend service -func (r *IngressReconciler) processBackendService(ctx context.Context, tctx *provider.TranslateContext, namespace string, backendService *networkingv1.IngressServiceBackend) error { +func (r *IngressReconciler) processBackendService(tctx *provider.TranslateContext, namespace string, backendService *networkingv1.IngressServiceBackend) error { // get the service var service corev1.Service - if err := r.Get(ctx, client.ObjectKey{ + if err := r.Get(tctx, client.ObjectKey{ Namespace: namespace, Name: backendService.Name, }, &service); err != nil { @@ -385,7 +385,7 @@ func (r *IngressReconciler) processBackendService(ctx context.Context, tctx *pro // get the endpoint slices endpointSliceList := &discoveryv1.EndpointSliceList{} - if err := r.List(ctx, endpointSliceList, + if err := r.List(tctx, endpointSliceList, client.InNamespace(namespace), client.MatchingLabels{ discoveryv1.LabelServiceName: backendService.Name, diff --git a/internal/controller/policies.go b/internal/controller/policies.go new file mode 100644 index 000000000..16d4dcb81 --- /dev/null +++ b/internal/controller/policies.go @@ -0,0 +1,119 @@ +package controller + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/api7/api7-ingress-controller/api/v1alpha1" + "github.com/api7/api7-ingress-controller/internal/controller/config" + "github.com/api7/api7-ingress-controller/internal/controller/indexer" + "github.com/api7/api7-ingress-controller/internal/provider" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type PolicyTargetKey struct { + NsName types.NamespacedName + GroupKind schema.GroupKind +} + +func (p PolicyTargetKey) String() string { + return p.NsName.String() + "/" + p.GroupKind.String() +} + +func ProcessBackendTrafficPolicy(c client.Client, tctx *provider.TranslateContext) { + conflicts := map[string]v1alpha1.BackendTrafficPolicy{} + for _, service := range tctx.Services { + backendTrafficPolicyList := &v1alpha1.BackendTrafficPolicyList{} + if err := c.List(tctx, backendTrafficPolicyList, + client.MatchingFields{ + indexer.PolicyTargetRefs: indexer.GenIndexKeyWithGK("", "Service", service.Namespace, service.Name), + }, + ); err != nil { + if client.IgnoreNotFound(err) == nil { + continue + } + return + } + if len(backendTrafficPolicyList.Items) == 0 { + continue + } + + portNameExist := make(map[string]bool, len(service.Spec.Ports)) + for _, port := range service.Spec.Ports { + portNameExist[port.Name] = true + } + for _, policy := range backendTrafficPolicyList.Items { + targetRefs := policy.Spec.TargetRefs + updated := false + for _, targetRef := range targetRefs { + sectionName := targetRef.SectionName + key := PolicyTargetKey{ + NsName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, + GroupKind: schema.GroupKind{Group: "", Kind: "Service"}, + } + condition := NewPolicyCondition(policy.Generation, true, "Policy has been accepted") + if sectionName != nil && !portNameExist[string(*sectionName)] { + condition = NewPolicyCondition(policy.Generation, false, fmt.Sprintf("SectionName %s not found in Service %s/%s", *sectionName, service.Namespace, service.Name)) + goto record_status + } + if p, ok := conflicts[key.String()]; ok && (p.Name == policy.Name && p.Namespace == policy.Namespace) { + condition = NewPolicyConflictCondition(policy.Generation, fmt.Sprintf("Unable to target Service %s/%s, because it conflicts with another BackendTrafficPolicy", service.Namespace, service.Name)) + goto record_status + } + conflicts[key.String()] = policy + record_status: + if ok := SetAncestors(&policy.Status, tctx.ParentRefs, condition); ok { + updated = true + } + } + if _, ok := tctx.BackendTrafficPolicies[types.NamespacedName{ + Name: policy.Name, + Namespace: policy.Namespace, + }]; ok { + continue + } + + if updated { + tctx.StatusUpdaters = append(tctx.StatusUpdaters, policy.DeepCopy()) + } + + tctx.BackendTrafficPolicies[types.NamespacedName{ + Name: policy.Name, + Namespace: policy.Namespace, + }] = policy.DeepCopy() + } + } +} +func SetAncestors(status *v1alpha1.PolicyStatus, parentRefs []gatewayv1.ParentReference, condition metav1.Condition) bool { + updated := false + for _, parent := range parentRefs { + ancestorStatus := gatewayv1alpha2.PolicyAncestorStatus{ + AncestorRef: parent, + Conditions: []metav1.Condition{condition}, + ControllerName: gatewayv1alpha2.GatewayController(config.ControllerConfig.ControllerName), + } + if SetAncestorStatus(status, ancestorStatus) { + updated = true + } + } + return updated +} + +func SetAncestorStatus(status *v1alpha1.PolicyStatus, ancestorStatus gatewayv1alpha2.PolicyAncestorStatus) bool { + for _, c := range status.Ancestors { + if c.AncestorRef == ancestorStatus.AncestorRef { + if c.Conditions[0].ObservedGeneration < ancestorStatus.Conditions[0].ObservedGeneration { + c.Conditions = ancestorStatus.Conditions + return true + } + return false + } + } + status.Ancestors = append(status.Ancestors, ancestorStatus) + return true +} diff --git a/internal/controller/status.go b/internal/controller/status.go index d60e2616b..cae419995 100644 --- a/internal/controller/status.go +++ b/internal/controller/status.go @@ -1,8 +1,12 @@ package controller import ( + "github.com/api7/api7-ingress-controller/internal/provider" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" ) const ( @@ -44,3 +48,43 @@ func VerifyConditions(conditions *[]metav1.Condition, newCondition metav1.Condit } return true } + +func NewPolicyCondition(observedGeneration int64, status bool, message string) metav1.Condition { + conditionStatus := metav1.ConditionTrue + reason := string(gatewayv1alpha2.PolicyReasonAccepted) + if !status { + conditionStatus = metav1.ConditionFalse + reason = string(gatewayv1alpha2.PolicyReasonInvalid) + } + + return metav1.Condition{ + Type: string(gatewayv1alpha2.PolicyConditionAccepted), + Reason: reason, + Status: conditionStatus, + Message: message, + ObservedGeneration: observedGeneration, + } +} + +func NewPolicyConflictCondition(observedGeneration int64, message string) metav1.Condition { + return metav1.Condition{ + Type: string(gatewayv1alpha2.PolicyConditionAccepted), + Reason: string(gatewayv1alpha2.PolicyReasonConflicted), + Status: metav1.ConditionFalse, + Message: message, + ObservedGeneration: observedGeneration, + } +} + +func UpdateStatus( + c client.Client, + log logr.Logger, + tctx *provider.TranslateContext, +) { + for _, obj := range tctx.StatusUpdaters { + if err := c.Status().Update(tctx, obj); err != nil { + log.Error(err, "failed to update status", "object", obj) + continue + } + } +} diff --git a/internal/provider/adc/translator/httproute.go b/internal/provider/adc/translator/httproute.go index 8dea67a97..37dc9687a 100644 --- a/internal/provider/adc/translator/httproute.go +++ b/internal/provider/adc/translator/httproute.go @@ -290,15 +290,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou upNodes := t.translateBackendRef(tctx, backend.BackendRef) upstream.Nodes = append(upstream.Nodes, upNodes...) } - if len(upstream.Nodes) == 0 { - upstream.Nodes = adctypes.UpstreamNodes{ - { - Host: "0.0.0.0", - Port: 80, - Weight: 1, - }, - } - } + t.attachBackendTrafficPolicyToUpstream(nil, upstream) // todo: support multiple backends service := adctypes.NewDefaultService() diff --git a/internal/provider/adc/translator/ingress.go b/internal/provider/adc/translator/ingress.go index 19cc09034..fa0ca853c 100644 --- a/internal/provider/adc/translator/ingress.go +++ b/internal/provider/adc/translator/ingress.go @@ -109,6 +109,8 @@ func (t *Translator) TranslateIngress(tctx *provider.TranslateContext, obj *netw Name: backendService.Name, }] + t.attachBackendTrafficPolicyToUpstream(nil, upstream) + // get the service port configuration var servicePort int32 = 0 var servicePortName string diff --git a/internal/provider/adc/translator/policies.go b/internal/provider/adc/translator/policies.go new file mode 100644 index 000000000..07024f566 --- /dev/null +++ b/internal/provider/adc/translator/policies.go @@ -0,0 +1,60 @@ +package translator + +import ( + "github.com/api7/api7-ingress-controller/api/adc" + "github.com/api7/api7-ingress-controller/api/v1alpha1" + "k8s.io/apimachinery/pkg/types" + + adctypes "github.com/api7/api7-ingress-controller/api/adc" +) + +func (t *Translator) AttachBackendTrafficPolicyToUpstream(policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { + if len(policies) == 0 { + return + } + for _, policy := range policies { + t.attachBackendTrafficPolicyToUpstream(policy, upstream) + } + +} + +func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { + if policy == nil { + return + } + upstream.UpstreamHost = string(policy.Spec.Host) + upstream.Scheme = policy.Spec.Scheme + if policy.Spec.Retries != nil { + upstream.Retries = new(int64) + *upstream.Retries = int64(*policy.Spec.Retries) + } + if policy.Spec.Timeout != nil { + var ( + connect *int64 + read *int64 + send *int64 + ) + if policy.Spec.Timeout.Connect.Duration > 0 { + connect = new(int64) + *connect = policy.Spec.Timeout.Connect.Duration.Milliseconds() + } + if policy.Spec.Timeout.Read.Duration > 0 { + read = new(int64) + *read = policy.Spec.Timeout.Read.Duration.Milliseconds() + } + if policy.Spec.Timeout.Send.Duration > 0 { + send = new(int64) + *send = policy.Spec.Timeout.Send.Duration.Milliseconds() + } + upstream.Timeout = &adctypes.Timeout{ + Connect: connect, + Read: read, + Send: send, + } + } + if policy.Spec.LoadBalancer != nil { + upstream.Type = adc.UpstreamType(policy.Spec.LoadBalancer.Type) + upstream.HashOn = policy.Spec.LoadBalancer.HashOn + upstream.Key = policy.Spec.LoadBalancer.Key + } +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index a57675f27..395d7eaa7 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -18,21 +18,28 @@ type Provider interface { } type TranslateContext struct { - BackendRefs []gatewayv1.BackendRef - GatewayTLSConfig []gatewayv1.GatewayTLSConfig - GatewayProxy *v1alpha1.GatewayProxy - Credentials []v1alpha1.Credential - EndpointSlices map[types.NamespacedName][]discoveryv1.EndpointSlice - Secrets map[types.NamespacedName]*corev1.Secret - PluginConfigs map[types.NamespacedName]*v1alpha1.PluginConfig - Services map[types.NamespacedName]*corev1.Service + context.Context + ParentRefs []gatewayv1.ParentReference + BackendRefs []gatewayv1.BackendRef + GatewayTLSConfig []gatewayv1.GatewayTLSConfig + GatewayProxy *v1alpha1.GatewayProxy + Credentials []v1alpha1.Credential + EndpointSlices map[types.NamespacedName][]discoveryv1.EndpointSlice + Secrets map[types.NamespacedName]*corev1.Secret + PluginConfigs map[types.NamespacedName]*v1alpha1.PluginConfig + Services map[types.NamespacedName]*corev1.Service + BackendTrafficPolicies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy + + StatusUpdaters []client.Object } -func NewDefaultTranslateContext() *TranslateContext { +func NewDefaultTranslateContext(ctx context.Context) *TranslateContext { return &TranslateContext{ - EndpointSlices: make(map[types.NamespacedName][]discoveryv1.EndpointSlice), - Secrets: make(map[types.NamespacedName]*corev1.Secret), - PluginConfigs: make(map[types.NamespacedName]*v1alpha1.PluginConfig), - Services: make(map[types.NamespacedName]*corev1.Service), + Context: ctx, + EndpointSlices: make(map[types.NamespacedName][]discoveryv1.EndpointSlice), + Secrets: make(map[types.NamespacedName]*corev1.Secret), + PluginConfigs: make(map[types.NamespacedName]*v1alpha1.PluginConfig), + Services: make(map[types.NamespacedName]*corev1.Service), + BackendTrafficPolicies: make(map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy), } } From 4d4f3574d2644d0d83f99bc3c2a63d01a873b916 Mon Sep 17 00:00:00 2001 From: rongxin Date: Wed, 16 Apr 2025 10:32:09 +0800 Subject: [PATCH 02/11] feat: support backendtrafficpolicy for httproute --- api/adc/types.go | 6 +-- api/v1alpha1/backendtrafficpolicy_types.go | 7 ++- ...eway.apisix.io_backendtrafficpolicies.yaml | 3 ++ internal/controller/httproute_controller.go | 49 +++++++++++++++++++ internal/controller/indexer/indexer.go | 11 ++++- internal/controller/policies.go | 11 +++-- internal/manager/run.go | 5 ++ internal/provider/adc/translator/httproute.go | 7 +++ internal/provider/adc/translator/policies.go | 42 +++++++--------- 9 files changed, 106 insertions(+), 35 deletions(-) diff --git a/api/adc/types.go b/api/adc/types.go index 2ad73a998..2c9bede84 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -128,9 +128,9 @@ type Route struct { } type Timeout struct { - Connect *int64 `json:"connect,omitempty"` - Read *int64 `json:"read,omitempty"` - Send *int64 `json:"send,omitempty"` + Connect int64 `json:"connect"` + Read int64 `json:"read"` + Send int64 `json:"send"` } type StreamRoute struct { diff --git a/api/v1alpha1/backendtrafficpolicy_types.go b/api/v1alpha1/backendtrafficpolicy_types.go index 3cf4ad767..6477bd38f 100644 --- a/api/v1alpha1/backendtrafficpolicy_types.go +++ b/api/v1alpha1/backendtrafficpolicy_types.go @@ -72,9 +72,12 @@ type LoadBalancer struct { } type Timeout struct { + // +kubebuilder:default="60s" Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"` - Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"` - Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"` + // +kubebuilder:default="60s" + Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"` + // +kubebuilder:default="60s" + Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"` } // +kubebuilder:object:root=true diff --git a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml index 2d3130080..a76601cd5 100644 --- a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml +++ b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml @@ -168,10 +168,13 @@ spec: upstream. properties: connect: + default: 60s type: string read: + default: 60s type: string send: + default: 60s type: string type: object upstream_host: diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index bc33dd79b..21d4a2bc1 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/go-logr/logr" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,6 +24,7 @@ import ( "github.com/api7/api7-ingress-controller/api/v1alpha1" "github.com/api7/api7-ingress-controller/internal/controller/indexer" "github.com/api7/api7-ingress-controller/internal/provider" + "github.com/api7/gopkg/pkg/log" ) // HTTPRouteReconciler reconciles a GatewayClass object. @@ -65,6 +67,9 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { }, ), ). + Watches(&v1alpha1.BackendTrafficPolicy{}, + handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesForBackendTrafficPolicy), + ). Complete(r) } @@ -126,6 +131,8 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } + ProcessBackendTrafficPolicy(r.Client, tctx) + if err := r.Provider.Update(ctx, tctx, hr); err != nil { acceptStatus.status = false acceptStatus.msg = err.Error() @@ -208,6 +215,48 @@ func (r *HTTPRouteReconciler) listHTTPRoutesByExtensionRef(ctx context.Context, return requests } +func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + policy, ok := obj.(*v1alpha1.BackendTrafficPolicy) + if !ok { + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + return nil + } + + httprouteList := []gatewayv1.HTTPRoute{} + for _, targetRef := range policy.Spec.TargetRefs { + service := &corev1.Service{} + if err := r.Get(ctx, client.ObjectKey{ + Namespace: policy.Namespace, + Name: string(targetRef.Name), + }, service); err != nil { + if client.IgnoreNotFound(err) != nil { + r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name) + } + continue + } + hrList := &gatewayv1.HTTPRouteList{} + if err := r.List(ctx, hrList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)), + }); err != nil { + r.Log.Error(err, "failed to list httproutes by service reference", "service", targetRef.Name) + return nil + } + httprouteList = append(httprouteList, hrList.Items...) + } + + requests := make([]reconcile.Request, 0, len(httprouteList)) + for _, hr := range httprouteList { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: hr.Namespace, + Name: hr.Name, + }, + }) + } + log.Errorw("list httproutes for backend traffic policy", zap.Any("httproutes", requests)) + return requests +} + func (r *HTTPRouteReconciler) listHTTPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request { gateway, ok := obj.(*gatewayv1.Gateway) if !ok { diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index f07024dd4..9275ac6bc 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -131,6 +131,15 @@ func setupHTTPRouteIndexer(mgr ctrl.Manager) error { ); err != nil { return err } + + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + &v1alpha1.BackendTrafficPolicy{}, + PolicyTargetRefs, + BackendTrafficPolicyIndexFunc, + ); err != nil { + return err + } return nil } @@ -314,7 +323,7 @@ func BackendTrafficPolicyIndexFunc(rawObj client.Object) []string { for _, ref := range btp.Spec.TargetRefs { keys = append(keys, GenIndexKeyWithGK( - v1alpha1.GroupVersion.Group, + string(ref.Group), string(ref.Kind), btp.GetNamespace(), string(ref.Name), diff --git a/internal/controller/policies.go b/internal/controller/policies.go index 16d4dcb81..7a9e870df 100644 --- a/internal/controller/policies.go +++ b/internal/controller/policies.go @@ -10,6 +10,7 @@ import ( "github.com/api7/api7-ingress-controller/internal/controller/config" "github.com/api7/api7-ingress-controller/internal/controller/indexer" "github.com/api7/api7-ingress-controller/internal/provider" + "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -25,7 +26,9 @@ func (p PolicyTargetKey) String() string { return p.NsName.String() + "/" + p.GroupKind.String() } -func ProcessBackendTrafficPolicy(c client.Client, tctx *provider.TranslateContext) { +func ProcessBackendTrafficPolicy(c client.Client, + log logr.Logger, + tctx *provider.TranslateContext) { conflicts := map[string]v1alpha1.BackendTrafficPolicy{} for _, service := range tctx.Services { backendTrafficPolicyList := &v1alpha1.BackendTrafficPolicyList{} @@ -34,10 +37,8 @@ func ProcessBackendTrafficPolicy(c client.Client, tctx *provider.TranslateContex indexer.PolicyTargetRefs: indexer.GenIndexKeyWithGK("", "Service", service.Namespace, service.Name), }, ); err != nil { - if client.IgnoreNotFound(err) == nil { - continue - } - return + log.Error(err, "failed to list BackendTrafficPolicy for Service") + continue } if len(backendTrafficPolicyList.Items) == 0 { continue diff --git a/internal/manager/run.go b/internal/manager/run.go index 317608a0f..f66607596 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -4,11 +4,13 @@ import ( "context" "crypto/tls" "os" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" @@ -100,6 +102,9 @@ func Run(ctx context.Context, logger logr.Logger) error { LeaderElection: true, LeaderElectionID: cfg.LeaderElectionID, LeaderElectionNamespace: namespace, + LeaseDuration: ptr.To(time.Second * 15), + RenewDeadline: ptr.To(time.Second * 10), + RetryPeriod: ptr.To(time.Second * 5), // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily // when the Manager ends. This requires the binary to immediately end when the // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly diff --git a/internal/provider/adc/translator/httproute.go b/internal/provider/adc/translator/httproute.go index 37dc9687a..dfe076f0d 100644 --- a/internal/provider/adc/translator/httproute.go +++ b/internal/provider/adc/translator/httproute.go @@ -288,6 +288,13 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou backend.Namespace = &namespace } upNodes := t.translateBackendRef(tctx, backend.BackendRef) + for _, po := range tctx.BackendTrafficPolicies { + log.Errorw("backend traffic policy", + zap.String("policy", po.Name), + zap.String("namespace", po.Namespace)) + } + + t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream) upstream.Nodes = append(upstream.Nodes, upNodes...) } t.attachBackendTrafficPolicyToUpstream(nil, upstream) diff --git a/internal/provider/adc/translator/policies.go b/internal/provider/adc/translator/policies.go index 07024f566..90b579fb7 100644 --- a/internal/provider/adc/translator/policies.go +++ b/internal/provider/adc/translator/policies.go @@ -4,18 +4,29 @@ import ( "github.com/api7/api7-ingress-controller/api/adc" "github.com/api7/api7-ingress-controller/api/v1alpha1" "k8s.io/apimachinery/pkg/types" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" adctypes "github.com/api7/api7-ingress-controller/api/adc" ) -func (t *Translator) AttachBackendTrafficPolicyToUpstream(policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { +func (t *Translator) AttachBackendTrafficPolicyToUpstream(ref gatewayv1.BackendRef, policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { if len(policies) == 0 { return } - for _, policy := range policies { - t.attachBackendTrafficPolicyToUpstream(policy, upstream) + var policy *v1alpha1.BackendTrafficPolicy + for _, po := range policies { + for _, targetRef := range po.Spec.TargetRefs { + if ref.Name == targetRef.Name && + (ref.Namespace != nil && string(*ref.Namespace) == po.Namespace) { + policy = po + break + } + } } - + if policy == nil { + return + } + t.attachBackendTrafficPolicyToUpstream(policy, upstream) } func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) { @@ -29,27 +40,10 @@ func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.Backe *upstream.Retries = int64(*policy.Spec.Retries) } if policy.Spec.Timeout != nil { - var ( - connect *int64 - read *int64 - send *int64 - ) - if policy.Spec.Timeout.Connect.Duration > 0 { - connect = new(int64) - *connect = policy.Spec.Timeout.Connect.Duration.Milliseconds() - } - if policy.Spec.Timeout.Read.Duration > 0 { - read = new(int64) - *read = policy.Spec.Timeout.Read.Duration.Milliseconds() - } - if policy.Spec.Timeout.Send.Duration > 0 { - send = new(int64) - *send = policy.Spec.Timeout.Send.Duration.Milliseconds() - } upstream.Timeout = &adctypes.Timeout{ - Connect: connect, - Read: read, - Send: send, + Connect: policy.Spec.Timeout.Connect.Duration.Milliseconds(), + Read: policy.Spec.Timeout.Read.Duration.Milliseconds(), + Send: policy.Spec.Timeout.Send.Duration.Milliseconds(), } } if policy.Spec.LoadBalancer != nil { From 97e302b7516bb7d2460368d470f592345bbf23ad Mon Sep 17 00:00:00 2001 From: rongxin Date: Thu, 17 Apr 2025 14:58:04 +0800 Subject: [PATCH 03/11] feat: support backendtrafficpolicy for httproute --- api/adc/types.go | 6 +- api/v1alpha1/backendtrafficpolicy_types.go | 6 + ...eway.apisix.io_backendtrafficpolicies.yaml | 3 + internal/controller/gateway_controller.go | 18 ++- internal/controller/httproute_controller.go | 96 +++++++++++++- internal/controller/source.go | 121 ++++++++++++++++++ internal/provider/adc/adc.go | 3 +- internal/provider/adc/translator/httproute.go | 6 - internal/provider/adc/translator/policies.go | 6 +- 9 files changed, 247 insertions(+), 18 deletions(-) create mode 100644 internal/controller/source.go diff --git a/api/adc/types.go b/api/adc/types.go index 38869c2c6..c83dac52d 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -126,9 +126,9 @@ type Route struct { } type Timeout struct { - Connect int64 `json:"connect"` - Read int64 `json:"read"` - Send int64 `json:"send"` + Connect int `json:"connect"` + Read int `json:"read"` + Send int `json:"send"` } type StreamRoute struct { diff --git a/api/v1alpha1/backendtrafficpolicy_types.go b/api/v1alpha1/backendtrafficpolicy_types.go index 3d438349d..538efebaf 100644 --- a/api/v1alpha1/backendtrafficpolicy_types.go +++ b/api/v1alpha1/backendtrafficpolicy_types.go @@ -69,10 +69,16 @@ type LoadBalancer struct { type Timeout struct { // +kubebuilder:default="60s" + // +kubebuilder:validation:Pattern=`^[0-9]+s$` + // +kubebuilder:validation:Type=string Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"` // +kubebuilder:default="60s" + // +kubebuilder:validation:Pattern=`^[0-9]+s$` + // +kubebuilder:validation:Type=string Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"` // +kubebuilder:default="60s" + // +kubebuilder:validation:Pattern=`^[0-9]+s$` + // +kubebuilder:validation:Type=string Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"` } diff --git a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml index 99e2fdea4..7771eacb7 100644 --- a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml +++ b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml @@ -164,12 +164,15 @@ spec: properties: connect: default: 60s + pattern: ^[0-9]+s$ type: string read: default: 60s + pattern: ^[0-9]+s$ type: string send: default: 60s + pattern: ^[0-9]+s$ type: string type: object upstream_host: diff --git a/internal/controller/gateway_controller.go b/internal/controller/gateway_controller.go index 47e42fa3c..d491e64f0 100644 --- a/internal/controller/gateway_controller.go +++ b/internal/controller/gateway_controller.go @@ -76,6 +76,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err := r.Provider.Delete(ctx, gateway); err != nil { return ctrl.Result{}, err } + return ctrl.Result{}, nil } return ctrl.Result{}, err } @@ -222,6 +223,9 @@ func (r *GatewayReconciler) listGatewaysForGatewayProxy(ctx context.Context, obj recs := make([]reconcile.Request, 0, len(gatewayList.Items)) for _, gateway := range gatewayList.Items { + if !r.checkGatewayClass(&gateway) { + continue + } recs = append(recs, reconcile.Request{ NamespacedName: client.ObjectKey{ Namespace: gateway.GetNamespace(), @@ -232,7 +236,7 @@ func (r *GatewayReconciler) listGatewaysForGatewayProxy(ctx context.Context, obj return recs } -func (r *GatewayReconciler) listGatewaysForHTTPRoute(_ context.Context, obj client.Object) []reconcile.Request { +func (r *GatewayReconciler) listGatewaysForHTTPRoute(ctx context.Context, obj client.Object) []reconcile.Request { httpRoute, ok := obj.(*gatewayv1.HTTPRoute) if !ok { r.Log.Error( @@ -256,6 +260,18 @@ func (r *GatewayReconciler) listGatewaysForHTTPRoute(_ context.Context, obj clie gatewayNamespace = string(*parentRef.Namespace) } + gateway := new(gatewayv1.Gateway) + if err := r.Get(ctx, client.ObjectKey{ + Namespace: gatewayNamespace, + Name: string(parentRef.Name), + }, gateway); err != nil { + continue + } + + if !r.checkGatewayClass(gateway) { + continue + } + recs = append(recs, reconcile.Request{ NamespacedName: client.ObjectKey{ Namespace: gatewayNamespace, diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 21d4a2bc1..0ff9af733 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/go-logr/logr" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,12 +18,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/api7/api7-ingress-controller/api/v1alpha1" "github.com/api7/api7-ingress-controller/internal/controller/indexer" "github.com/api7/api7-ingress-controller/internal/provider" - "github.com/api7/gopkg/pkg/log" ) // HTTPRouteReconciler reconciles a GatewayClass object. @@ -35,10 +34,14 @@ type HTTPRouteReconciler struct { //nolint:revive Log logr.Logger Provider provider.Provider + + genericEvent chan event.GenericEvent } // SetupWithManager sets up the controller with the Manager. func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.genericEvent = make(chan event.GenericEvent, 100) + return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1.HTTPRoute{}). WithEventFilter(predicate.GenerationChangedPredicate{}). @@ -69,10 +72,91 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { ). Watches(&v1alpha1.BackendTrafficPolicy{}, handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesForBackendTrafficPolicy), + builder.WithPredicates( + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy) + newObj, ok2 := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy) + if !ok || !ok2 { + return false + } + oldRefs := oldObj.Spec.TargetRefs + newRefs := newObj.Spec.TargetRefs + + // 将旧引用转换为 Map + oldRefMap := make(map[string]v1alpha1.BackendPolicyTargetReferenceWithSectionName) + for _, ref := range oldRefs { + key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) + oldRefMap[key] = ref + } + + for _, ref := range newRefs { + key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) + delete(oldRefMap, key) + } + if len(oldRefMap) > 0 { + targetRefs := make([]v1alpha1.BackendPolicyTargetReferenceWithSectionName, 0, len(oldRefs)) + for _, ref := range oldRefMap { + targetRefs = append(targetRefs, ref) + } + dump := oldObj.DeepCopy() + dump.Spec.TargetRefs = targetRefs + r.genericEvent <- event.GenericEvent{ + Object: dump, + } + } + return true + }, + }, + ), + ). + WatchesRawSource( + source.Channel( + r.genericEvent, + handler.EnqueueRequestsFromMapFunc(r.listHTTPRouteForGenericEvent), + ), ). Complete(r) } +func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, obj client.Object) []reconcile.Request { + requests := []reconcile.Request{} + switch v := obj.(type) { + case *v1alpha1.BackendTrafficPolicy: + httprouteAll := []gatewayv1.HTTPRoute{} + for _, ref := range v.Spec.TargetRefs { + httprouteList := &gatewayv1.HTTPRouteList{} + if err := r.List(ctx, httprouteList, client.MatchingFields{ + indexer.ServiceIndexRef: indexer.GenIndexKey(v.GetNamespace(), string(ref.Name)), + }); err != nil { + r.Log.Error(err, "failed to list HTTPRoutes for BackendTrafficPolicy", "namespace", v.GetNamespace(), "ref", ref.Name) + return nil + } + httprouteAll = append(httprouteAll, httprouteList.Items...) + } + for _, hr := range httprouteAll { + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: hr.Namespace, + Name: hr.Name, + }, + }) + } + default: + r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") + } + return requests +} + func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { hr := new(gatewayv1.HTTPRoute) if err := r.Get(ctx, req.NamespacedName, hr); err != nil { @@ -131,7 +215,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } - ProcessBackendTrafficPolicy(r.Client, tctx) + ProcessBackendTrafficPolicy(r.Client, r.Log, tctx) if err := r.Provider.Update(ctx, tctx, hr); err != nil { acceptStatus.status = false @@ -156,6 +240,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err := r.Status().Update(ctx, hr); err != nil { return ctrl.Result{}, err } + UpdateStatus(r.Client, r.Log, tctx) return ctrl.Result{}, nil } @@ -253,7 +338,10 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context. }, }) } - log.Errorw("list httproutes for backend traffic policy", zap.Any("httproutes", requests)) + if !policy.GetDeletionTimestamp().IsZero() { + // If the policy is deleted, we need to list all HTTPRoutes that reference this policy + // and add them to the requests. + } return requests } diff --git a/internal/controller/source.go b/internal/controller/source.go new file mode 100644 index 000000000..1fea5de40 --- /dev/null +++ b/internal/controller/source.go @@ -0,0 +1,121 @@ +package controller + +/* +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/api7/api7-ingress-controller/api/v1alpha1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +type TargetRefsChangedPredicate struct { + predicate.Funcs + DeletedRefs map[string]struct{} // 存储被删除的引用 +} + +// Update 事件处理 +func (t *TargetRefsChangedPredicate) Update(e event.UpdateEvent) bool { + oldPolicy, okOld := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy) + newPolicy, okNew := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy) + if !okOld || !okNew { + return false + } + + // 计算被删除的引用 + oldRefs := oldPolicy.Spec.TargetRefs + newRefs := newPolicy.Spec.TargetRefs + + // 将旧引用转换为 Map + oldRefMap := make(map[string]struct{}) + for _, ref := range oldRefs { + key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name) + oldRefMap[key] = struct{}{} + } + + // 找出被删除的引用 + t.DeletedRefs = make(map[string]struct{}) + for _, ref := range newRefs { + key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name) + delete(oldRefMap, key) + } + for key := range oldRefMap { + t.DeletedRefs[key] = struct{}{} + } + + return len(t.DeletedRefs) > 0 +} + +type DeletedRefEventSource struct { + Client client.Client + Predicate *TargetRefsChangedPredicate +} + +// Start 实现 Source 接口 +func (s *DeletedRefEventSource) Start( + ctx context.Context, + handler handler.EventHandler, + queue workqueue.RateLimitingInterface, + predicates ...predicate.Predicate, +) error { + // 监听 BackendTrafficPolicy 的 Update 事件(已通过 Predicate 过滤) + // 此处假设 Predicate 已捕获到被删除的引用 + go func() { + for { + select { + case <-ctx.Done(): + return + default: + if len(s.Predicate.DeletedRefs) == 0 { + time.Sleep(1 * time.Second) + continue + } + + // 遍历被删除的引用,查找关联的 HTTPRoute + for refKey := range s.Predicate.DeletedRefs { + parts := strings.Split(refKey, "/") + if len(parts) != 2 { + continue + } + kind, name := parts[0], parts[1] + + // 查找关联的 HTTPRoute + var routes gatewayv1.HTTPRouteList + if err := s.Client.List( + context.Background(), + &routes, + client.MatchingFields{"targetRefs": refKey}, + ); err != nil { + continue + } + + // 生成调和请求 + for _, route := range routes.Items { + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: route.Name, + Namespace: route.Namespace, + }, + } + handler.Generic(ctx, event.GenericEvent{Object: &route}, queue) + } + } + + // 清空已处理的引用 + s.Predicate.DeletedRefs = make(map[string]struct{}) + time.Sleep(5 * time.Second) // 避免高频触发 + } + } + }() + return nil +} +*/ diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index 285de1a1a..ab05e077e 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -7,6 +7,7 @@ import ( "errors" "os" "os/exec" + "runtime/debug" "go.uber.org/zap" networkingv1 "k8s.io/api/networking/v1" @@ -90,7 +91,7 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext, } func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { - log.Debugw("deleting object", zap.Any("object", obj)) + log.Debugw("deleting object", zap.Any("object", obj), zap.String("stack", string(debug.Stack()))) var resourceTypes []string var labels map[string]string diff --git a/internal/provider/adc/translator/httproute.go b/internal/provider/adc/translator/httproute.go index 3222e5487..812f908ef 100644 --- a/internal/provider/adc/translator/httproute.go +++ b/internal/provider/adc/translator/httproute.go @@ -287,12 +287,6 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou backend.Namespace = &namespace } upNodes := t.translateBackendRef(tctx, backend.BackendRef) - for _, po := range tctx.BackendTrafficPolicies { - log.Errorw("backend traffic policy", - zap.String("policy", po.Name), - zap.String("namespace", po.Namespace)) - } - t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream) upstream.Nodes = append(upstream.Nodes, upNodes...) } diff --git a/internal/provider/adc/translator/policies.go b/internal/provider/adc/translator/policies.go index 1ed0c0081..44dbaa6b0 100644 --- a/internal/provider/adc/translator/policies.go +++ b/internal/provider/adc/translator/policies.go @@ -41,9 +41,9 @@ func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.Backe } if policy.Spec.Timeout != nil { upstream.Timeout = &adctypes.Timeout{ - Connect: policy.Spec.Timeout.Connect.Duration.Seconds(), - Read: policy.Spec.Timeout.Read.Duration.Seconds(), - Send: policy.Spec.Timeout.Send.Duration.Seconds(), + Connect: int(policy.Spec.Timeout.Connect.Duration.Seconds()), + Read: int(policy.Spec.Timeout.Read.Duration.Seconds()), + Send: int(policy.Spec.Timeout.Send.Duration.Seconds()), } } if policy.Spec.LoadBalancer != nil { From 0da0b4aa42be5fa1764cc6994ee9e13818e430b1 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 08:47:12 +0800 Subject: [PATCH 04/11] manager status and test --- api/adc/types.go | 10 +- api/v1alpha1/backendtrafficpolicy_types.go | 6 +- ...eway.apisix.io_backendtrafficpolicies.yaml | 6 +- internal/controller/httproute_controller.go | 47 ++++--- internal/controller/indexer/indexer.go | 10 +- internal/controller/policies.go | 115 ++++++++++-------- internal/controller/status.go | 7 +- internal/provider/adc/translator/policies.go | 1 + internal/provider/provider.go | 2 +- test/e2e/crds/consumer.go | 48 ++------ test/e2e/scaffold/k8s.go | 38 ++++++ 11 files changed, 158 insertions(+), 132 deletions(-) diff --git a/api/adc/types.go b/api/adc/types.go index c83dac52d..1c561fd19 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -149,7 +149,7 @@ type Upstream struct { HashOn string `json:"hash_on,omitempty" yaml:"hash_on,omitempty"` Key string `json:"key,omitempty" yaml:"key,omitempty"` Nodes UpstreamNodes `json:"nodes" yaml:"nodes"` - PassHost *PassHost `json:"pass_host,omitempty" yaml:"pass_host,omitempty"` + PassHost string `json:"pass_host,omitempty" yaml:"pass_host,omitempty"` Retries *int64 `json:"retries,omitempty" yaml:"retries,omitempty"` RetryTimeout *float64 `json:"retry_timeout,omitempty" yaml:"retry_timeout,omitempty"` Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"` @@ -202,14 +202,6 @@ const ( Trace Method = "TRACE" ) -type PassHost string - -const ( - Node PassHost = "node" - Pass PassHost = "pass" - Rewrite PassHost = "rewrite" -) - type Scheme string const ( diff --git a/api/v1alpha1/backendtrafficpolicy_types.go b/api/v1alpha1/backendtrafficpolicy_types.go index 538efebaf..9e2f1d756 100644 --- a/api/v1alpha1/backendtrafficpolicy_types.go +++ b/api/v1alpha1/backendtrafficpolicy_types.go @@ -44,11 +44,11 @@ type BackendTrafficPolicySpec struct { // // +kubebuilder:validation:Enum=pass;node;rewrite; // +kubebuilder:default=pass - PassHost string `json:"pass_host,omitempty" yaml:"pass_host,omitempty"` + PassHost string `json:"passHost,omitempty" yaml:"passHost,omitempty"` // Specifies the host of the Upstream request. This is only valid if - // the pass_host is set to rewrite - Host Hostname `json:"upstream_host,omitempty" yaml:"upstream_host,omitempty"` + // the passHost is set to rewrite + Host Hostname `json:"upstreamHost,omitempty" yaml:"upstreamHost,omitempty"` } // LoadBalancer describes the load balancing parameters. diff --git a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml index 7771eacb7..f26471b96 100644 --- a/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml +++ b/config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml @@ -70,7 +70,7 @@ spec: type: object x-kubernetes-validations: - rule: '!(has(self.key) && self.type != ''chash'')' - pass_host: + passHost: default: pass description: |- Configures the host when the request is forwarded to the upstream. @@ -175,10 +175,10 @@ spec: pattern: ^[0-9]+s$ type: string type: object - upstream_host: + upstreamHost: description: |- Specifies the host of the Upstream request. This is only valid if - the pass_host is set to rewrite + the passHost is set to rewrite maxLength: 253 minLength: 1 pattern: ^(\*\.)?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 0ff9af733..ad3254686 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -129,6 +129,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, obj client.Object) []reconcile.Request { + var namespacedNameMap = make(map[types.NamespacedName]struct{}) requests := []reconcile.Request{} switch v := obj.(type) { case *v1alpha1.BackendTrafficPolicy: @@ -144,12 +145,19 @@ func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, httprouteAll = append(httprouteAll, httprouteList.Items...) } for _, hr := range httprouteAll { - requests = append(requests, reconcile.Request{ - NamespacedName: client.ObjectKey{ - Namespace: hr.Namespace, - Name: hr.Name, - }, - }) + key := types.NamespacedName{ + Namespace: hr.Namespace, + Name: hr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: hr.Namespace, + Name: hr.Name, + }, + }) + } } default: r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy") @@ -203,6 +211,8 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( tctx := provider.NewDefaultTranslateContext(ctx) + tctx.RouteParentRefs = hr.Spec.ParentRefs + if err := r.processHTTPRoute(tctx, hr); err != nil { acceptStatus.status = false acceptStatus.msg = err.Error() @@ -328,19 +338,22 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context. } httprouteList = append(httprouteList, hrList.Items...) } - + var namespacedNameMap = make(map[types.NamespacedName]struct{}) requests := make([]reconcile.Request, 0, len(httprouteList)) for _, hr := range httprouteList { - requests = append(requests, reconcile.Request{ - NamespacedName: client.ObjectKey{ - Namespace: hr.Namespace, - Name: hr.Name, - }, - }) - } - if !policy.GetDeletionTimestamp().IsZero() { - // If the policy is deleted, we need to list all HTTPRoutes that reference this policy - // and add them to the requests. + key := types.NamespacedName{ + Namespace: hr.Namespace, + Name: hr.Name, + } + if _, ok := namespacedNameMap[key]; !ok { + namespacedNameMap[key] = struct{}{} + requests = append(requests, reconcile.Request{ + NamespacedName: client.ObjectKey{ + Namespace: hr.Namespace, + Name: hr.Name, + }, + }) + } } return requests } diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 42d809fb9..90310b07e 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -37,11 +37,9 @@ func SetupIndexer(mgr ctrl.Manager) error { if err := setupConsumerIndexer(mgr); err != nil { return err } - /* - if err := setupBackendTrafficPolicyIndexer(mgr); err != nil { - return err - } - */ + if err := setupBackendTrafficPolicyIndexer(mgr); err != nil { + return err + } return nil } @@ -192,7 +190,7 @@ func setupIngressIndexer(mgr ctrl.Manager) error { return nil } -func SetupBackendTrafficPolicyIndexer(mgr ctrl.Manager) error { +func setupBackendTrafficPolicyIndexer(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField( context.Background(), &v1alpha1.BackendTrafficPolicy{}, diff --git a/internal/controller/policies.go b/internal/controller/policies.go index ee1770fa8..0ac10bfc9 100644 --- a/internal/controller/policies.go +++ b/internal/controller/policies.go @@ -3,6 +3,7 @@ package controller import ( "fmt" + "k8s.io/utils/ptr" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" @@ -11,6 +12,7 @@ import ( "github.com/api7/api7-ingress-controller/internal/controller/indexer" "github.com/api7/api7-ingress-controller/internal/provider" "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -26,10 +28,14 @@ func (p PolicyTargetKey) String() string { return p.NsName.String() + "/" + p.GroupKind.String() } -func ProcessBackendTrafficPolicy(c client.Client, +func ProcessBackendTrafficPolicy( + c client.Client, log logr.Logger, - tctx *provider.TranslateContext) { - conflicts := map[string]v1alpha1.BackendTrafficPolicy{} + tctx *provider.TranslateContext, +) { + conflicts := map[string]*v1alpha1.BackendTrafficPolicy{} + servicePortNameMap := map[string]bool{} + policyMap := map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy{} for _, service := range tctx.Services { backendTrafficPolicyList := &v1alpha1.BackendTrafficPolicyList{} if err := c.List(tctx, backendTrafficPolicyList, @@ -43,58 +49,61 @@ func ProcessBackendTrafficPolicy(c client.Client, if len(backendTrafficPolicyList.Items) == 0 { continue } - - portNameExist := make(map[string]bool, len(service.Spec.Ports)) for _, port := range service.Spec.Ports { - portNameExist[port.Name] = true + key := fmt.Sprintf("%s/%s/%s", service.Namespace, service.Name, port.Name) + servicePortNameMap[key] = true + } + + for _, p := range backendTrafficPolicyList.Items { + policyMap[types.NamespacedName{ + Name: p.Name, + Namespace: p.Namespace, + }] = p.DeepCopy() } - for _, policy := range backendTrafficPolicyList.Items { - targetRefs := policy.Spec.TargetRefs - updated := false - for _, targetRef := range targetRefs { - sectionName := targetRef.SectionName - key := PolicyTargetKey{ - NsName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, - GroupKind: schema.GroupKind{Group: "", Kind: "Service"}, - } - condition := NewPolicyCondition(policy.Generation, true, "Policy has been accepted") - if sectionName != nil && !portNameExist[string(*sectionName)] { - condition = NewPolicyCondition(policy.Generation, false, fmt.Sprintf("SectionName %s not found in Service %s/%s", *sectionName, service.Namespace, service.Name)) - processPolicyStatus(&policy, tctx, condition, &updated) - continue - } - if p, ok := conflicts[key.String()]; ok && (p.Name == policy.Name && p.Namespace == policy.Namespace) { - condition = NewPolicyConflictCondition(policy.Generation, fmt.Sprintf("Unable to target Service %s/%s, because it conflicts with another BackendTrafficPolicy", service.Namespace, service.Name)) - processPolicyStatus(&policy, tctx, condition, &updated) - continue - } - conflicts[key.String()] = policy - processPolicyStatus(&policy, tctx, condition, &updated) + } + + for _, p := range policyMap { + policy := p.DeepCopy() + targetRefs := policy.Spec.TargetRefs + updated := false + for _, targetRef := range targetRefs { + sectionName := targetRef.SectionName + key := PolicyTargetKey{ + NsName: types.NamespacedName{Namespace: p.GetNamespace(), Name: string(targetRef.Name)}, + GroupKind: schema.GroupKind{Group: "", Kind: "Service"}, } - if _, ok := tctx.BackendTrafficPolicies[types.NamespacedName{ - Name: policy.Name, - Namespace: policy.Namespace, - }]; ok { + condition := NewPolicyCondition(policy.Generation, true, "Policy has been accepted") + if sectionName != nil && !servicePortNameMap[fmt.Sprintf("%s/%s/%s", policy.Namespace, string(targetRef.Name), *sectionName)] { + condition = NewPolicyCondition(policy.Generation, false, fmt.Sprintf("No section name %s found in Service %s/%s", *sectionName, policy.Namespace, targetRef.Name)) + processPolicyStatus(policy, tctx, condition, &updated) continue } - - if updated { - tctx.StatusUpdaters = append(tctx.StatusUpdaters, policy.DeepCopy()) + if _, ok := conflicts[key.String()]; ok { + condition = NewPolicyConflictCondition(policy.Generation, fmt.Sprintf("Unable to target Service %s/%s, because it conflicts with another BackendTrafficPolicy", policy.Namespace, targetRef.Name)) + processPolicyStatus(policy, tctx, condition, &updated) + continue } - - tctx.BackendTrafficPolicies[types.NamespacedName{ - Name: policy.Name, - Namespace: policy.Namespace, - }] = policy.DeepCopy() + conflicts[key.String()] = policy + processPolicyStatus(policy, tctx, condition, &updated) + } + if updated { + tctx.StatusUpdaters = append(tctx.StatusUpdaters, policy.DeepCopy()) } } + for _, policy := range conflicts { + tctx.BackendTrafficPolicies[types.NamespacedName{ + Name: policy.Name, + Namespace: policy.Namespace, + }] = policy + } } func processPolicyStatus(policy *v1alpha1.BackendTrafficPolicy, tctx *provider.TranslateContext, condition metav1.Condition, - updated *bool) { - if ok := SetAncestors(&policy.Status, tctx.ParentRefs, condition); ok { + updated *bool, +) { + if ok := SetAncestors(&policy.Status, tctx.RouteParentRefs, condition); ok { *updated = true } } @@ -115,17 +124,21 @@ func SetAncestors(status *v1alpha1.PolicyStatus, parentRefs []gatewayv1.ParentRe } func SetAncestorStatus(status *v1alpha1.PolicyStatus, ancestorStatus gatewayv1alpha2.PolicyAncestorStatus) bool { + if len(ancestorStatus.Conditions) == 0 { + return false + } + condition := ancestorStatus.Conditions[0] for _, c := range status.Ancestors { - if c.AncestorRef == ancestorStatus.AncestorRef { - if len(c.Conditions) == 0 || len(ancestorStatus.Conditions) == 0 { - c.Conditions = ancestorStatus.Conditions - return true - } - if c.Conditions[0].ObservedGeneration < ancestorStatus.Conditions[0].ObservedGeneration { - c.Conditions = ancestorStatus.Conditions - return true + if c.AncestorRef.Name == ancestorStatus.AncestorRef.Name && + ptr.Equal(c.AncestorRef.Namespace, ancestorStatus.AncestorRef.Namespace) && + ptr.Equal(c.AncestorRef.Group, ancestorStatus.AncestorRef.Group) && + ptr.Equal(c.AncestorRef.Kind, ancestorStatus.AncestorRef.Kind) && + c.ControllerName == ancestorStatus.ControllerName { + if !VerifyConditions(&c.Conditions, condition) { + return false } - return false + meta.SetStatusCondition(&c.Conditions, condition) + return true } } status.Ancestors = append(status.Ancestors, ancestorStatus) diff --git a/internal/controller/status.go b/internal/controller/status.go index cae419995..05b0d5139 100644 --- a/internal/controller/status.go +++ b/internal/controller/status.go @@ -63,6 +63,7 @@ func NewPolicyCondition(observedGeneration int64, status bool, message string) m Status: conditionStatus, Message: message, ObservedGeneration: observedGeneration, + LastTransitionTime: metav1.Now(), } } @@ -73,6 +74,7 @@ func NewPolicyConflictCondition(observedGeneration int64, message string) metav1 Status: metav1.ConditionFalse, Message: message, ObservedGeneration: observedGeneration, + LastTransitionTime: metav1.Now(), } } @@ -82,9 +84,6 @@ func UpdateStatus( tctx *provider.TranslateContext, ) { for _, obj := range tctx.StatusUpdaters { - if err := c.Status().Update(tctx, obj); err != nil { - log.Error(err, "failed to update status", "object", obj) - continue - } + _ = c.Status().Update(tctx, obj) } } diff --git a/internal/provider/adc/translator/policies.go b/internal/provider/adc/translator/policies.go index 44dbaa6b0..a71e76bd1 100644 --- a/internal/provider/adc/translator/policies.go +++ b/internal/provider/adc/translator/policies.go @@ -33,6 +33,7 @@ func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.Backe if policy == nil { return } + upstream.PassHost = policy.Spec.PassHost upstream.UpstreamHost = string(policy.Spec.Host) upstream.Scheme = policy.Spec.Scheme if policy.Spec.Retries != nil { diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 395d7eaa7..fa05e07e1 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -19,7 +19,7 @@ type Provider interface { type TranslateContext struct { context.Context - ParentRefs []gatewayv1.ParentReference + RouteParentRefs []gatewayv1.ParentReference BackendRefs []gatewayv1.BackendRef GatewayTLSConfig []gatewayv1.GatewayTLSConfig GatewayProxy *v1alpha1.GatewayProxy diff --git a/test/e2e/crds/consumer.go b/test/e2e/crds/consumer.go index cb0d0b40f..fc7405b27 100644 --- a/test/e2e/crds/consumer.go +++ b/test/e2e/crds/consumer.go @@ -1,7 +1,6 @@ package gatewayapi import ( - "fmt" "time" . "github.com/onsi/ginkgo/v2" @@ -75,40 +74,6 @@ spec: port: 80 ` - var beforeEachHTTP = func() { - By("create GatewayClass") - gatewayClassName := fmt.Sprintf("api7-%d", time.Now().Unix()) - gatewayString := fmt.Sprintf(defaultGatewayClass, gatewayClassName, s.GetControllerName()) - err := s.CreateResourceFromStringWithNamespace(gatewayString, "") - Expect(err).NotTo(HaveOccurred(), "creating GatewayClass") - time.Sleep(5 * time.Second) - - By("check GatewayClass condition") - gcyaml, err := s.GetResourceYaml("GatewayClass", gatewayClassName) - Expect(err).NotTo(HaveOccurred(), "getting GatewayClass yaml") - Expect(gcyaml).To(ContainSubstring(`status: "True"`), "checking GatewayClass condition status") - Expect(gcyaml).To( - ContainSubstring("message: the gatewayclass has been accepted by the api7-ingress-controller"), - "checking GatewayClass condition message", - ) - - By("create Gateway") - err = s.CreateResourceFromString(fmt.Sprintf(defaultGateway, gatewayClassName)) - Expect(err).NotTo(HaveOccurred(), "creating Gateway") - time.Sleep(5 * time.Second) - - By("check Gateway condition") - gwyaml, err := s.GetResourceYaml("Gateway", "api7ee") - Expect(err).NotTo(HaveOccurred(), "getting Gateway yaml") - Expect(gwyaml).To(ContainSubstring(`status: "True"`), "checking Gateway condition status") - Expect(gwyaml).To( - ContainSubstring("message: the gateway has been accepted by the api7-ingress-controller"), - "checking Gateway condition message", - ) - - s.ResourceApplied("httproute", "httpbin", defaultHTTPRoute, 1) - } - Context("Consumer plugins", func() { var limitCountConsumer = ` apiVersion: gateway.apisix.io/v1alpha1 @@ -147,7 +112,9 @@ spec: key: sample-key2 ` - BeforeEach(beforeEachHTTP) + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) It("limit-count plugin", func() { s.ResourceApplied("Consumer", "consumer-sample", limitCountConsumer, 1) @@ -227,7 +194,10 @@ spec: config: key: consumer-key ` - BeforeEach(beforeEachHTTP) + + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) It("Create/Update/Delete", func() { s.ResourceApplied("Consumer", "consumer-sample", defaultCredential, 1) @@ -338,8 +308,10 @@ spec: config: key: sample-key2 ` - BeforeEach(beforeEachHTTP) + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) It("Create/Update/Delete", func() { err := s.CreateResourceFromString(keyAuthSecret) Expect(err).NotTo(HaveOccurred(), "creating key-auth secret") diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index e6cb5a3ad..4ba4ba28c 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -242,3 +242,41 @@ func (s *Scaffold) ResourceApplied(resourType, resourceName, resourceRaw string, ) time.Sleep(1 * time.Second) } + +func (s *Scaffold) ApplyDefaultGatewayResource( + defaultGatewayClass string, + defaultGateway string, + defaultHTTPRoute string, +) { + By("create GatewayClass") + gatewayClassName := fmt.Sprintf("api7-%d", time.Now().Unix()) + gatewayString := fmt.Sprintf(defaultGatewayClass, gatewayClassName, s.GetControllerName()) + err := s.CreateResourceFromStringWithNamespace(gatewayString, "") + Expect(err).NotTo(HaveOccurred(), "creating GatewayClass") + time.Sleep(5 * time.Second) + + By("check GatewayClass condition") + gcyaml, err := s.GetResourceYaml("GatewayClass", gatewayClassName) + Expect(err).NotTo(HaveOccurred(), "getting GatewayClass yaml") + Expect(gcyaml).To(ContainSubstring(`status: "True"`), "checking GatewayClass condition status") + Expect(gcyaml).To( + ContainSubstring("message: the gatewayclass has been accepted by the api7-ingress-controller"), + "checking GatewayClass condition message", + ) + + By("create Gateway") + err = s.CreateResourceFromString(fmt.Sprintf(defaultGateway, gatewayClassName)) + Expect(err).NotTo(HaveOccurred(), "creating Gateway") + time.Sleep(5 * time.Second) + + By("check Gateway condition") + gwyaml, err := s.GetResourceYaml("Gateway", "api7ee") + Expect(err).NotTo(HaveOccurred(), "getting Gateway yaml") + Expect(gwyaml).To(ContainSubstring(`status: "True"`), "checking Gateway condition status") + Expect(gwyaml).To( + ContainSubstring("message: the gateway has been accepted by the api7-ingress-controller"), + "checking Gateway condition message", + ) + + s.ResourceApplied("httproute", "httpbin", defaultHTTPRoute, 1) +} From 338d8e4591d988298578eb7041c3673e22b947f7 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 08:57:53 +0800 Subject: [PATCH 05/11] remove annotation --- internal/controller/httproute_controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/controller/httproute_controller.go b/internal/controller/httproute_controller.go index 7387e85d9..361735c45 100644 --- a/internal/controller/httproute_controller.go +++ b/internal/controller/httproute_controller.go @@ -92,7 +92,6 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { oldRefs := oldObj.Spec.TargetRefs newRefs := newObj.Spec.TargetRefs - // 将旧引用转换为 Map oldRefMap := make(map[string]v1alpha1.BackendPolicyTargetReferenceWithSectionName) for _, ref := range oldRefs { key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name) From 48686cabdca5646bc169e567d320a9932fe5fa82 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 09:02:16 +0800 Subject: [PATCH 06/11] remove file --- internal/controller/source.go | 121 ---------------------------------- 1 file changed, 121 deletions(-) delete mode 100644 internal/controller/source.go diff --git a/internal/controller/source.go b/internal/controller/source.go deleted file mode 100644 index 1fea5de40..000000000 --- a/internal/controller/source.go +++ /dev/null @@ -1,121 +0,0 @@ -package controller - -/* -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/api7/api7-ingress-controller/api/v1alpha1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" -) - -type TargetRefsChangedPredicate struct { - predicate.Funcs - DeletedRefs map[string]struct{} // 存储被删除的引用 -} - -// Update 事件处理 -func (t *TargetRefsChangedPredicate) Update(e event.UpdateEvent) bool { - oldPolicy, okOld := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy) - newPolicy, okNew := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy) - if !okOld || !okNew { - return false - } - - // 计算被删除的引用 - oldRefs := oldPolicy.Spec.TargetRefs - newRefs := newPolicy.Spec.TargetRefs - - // 将旧引用转换为 Map - oldRefMap := make(map[string]struct{}) - for _, ref := range oldRefs { - key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name) - oldRefMap[key] = struct{}{} - } - - // 找出被删除的引用 - t.DeletedRefs = make(map[string]struct{}) - for _, ref := range newRefs { - key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name) - delete(oldRefMap, key) - } - for key := range oldRefMap { - t.DeletedRefs[key] = struct{}{} - } - - return len(t.DeletedRefs) > 0 -} - -type DeletedRefEventSource struct { - Client client.Client - Predicate *TargetRefsChangedPredicate -} - -// Start 实现 Source 接口 -func (s *DeletedRefEventSource) Start( - ctx context.Context, - handler handler.EventHandler, - queue workqueue.RateLimitingInterface, - predicates ...predicate.Predicate, -) error { - // 监听 BackendTrafficPolicy 的 Update 事件(已通过 Predicate 过滤) - // 此处假设 Predicate 已捕获到被删除的引用 - go func() { - for { - select { - case <-ctx.Done(): - return - default: - if len(s.Predicate.DeletedRefs) == 0 { - time.Sleep(1 * time.Second) - continue - } - - // 遍历被删除的引用,查找关联的 HTTPRoute - for refKey := range s.Predicate.DeletedRefs { - parts := strings.Split(refKey, "/") - if len(parts) != 2 { - continue - } - kind, name := parts[0], parts[1] - - // 查找关联的 HTTPRoute - var routes gatewayv1.HTTPRouteList - if err := s.Client.List( - context.Background(), - &routes, - client.MatchingFields{"targetRefs": refKey}, - ); err != nil { - continue - } - - // 生成调和请求 - for _, route := range routes.Items { - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: route.Name, - Namespace: route.Namespace, - }, - } - handler.Generic(ctx, event.GenericEvent{Object: &route}, queue) - } - } - - // 清空已处理的引用 - s.Predicate.DeletedRefs = make(map[string]struct{}) - time.Sleep(5 * time.Second) // 避免高频触发 - } - } - }() - return nil -} -*/ From 6e3547da9bde557865bb90885c1d93e5d9f0c915 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 09:03:00 +0800 Subject: [PATCH 07/11] add test --- test/e2e/crds/backendtrafficpolicy.go | 124 ++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 test/e2e/crds/backendtrafficpolicy.go diff --git a/test/e2e/crds/backendtrafficpolicy.go b/test/e2e/crds/backendtrafficpolicy.go new file mode 100644 index 000000000..f017584de --- /dev/null +++ b/test/e2e/crds/backendtrafficpolicy.go @@ -0,0 +1,124 @@ +package gatewayapi + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/api7/api7-ingress-controller/test/e2e/scaffold" +) + +var _ = Describe("Test BackendTrafficPolicy", func() { + s := scaffold.NewDefaultScaffold() + + var defaultGatewayClass = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: %s +spec: + controllerName: %s +` + + var defaultGateway = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: api7ee +spec: + gatewayClassName: %s + listeners: + - name: http1 + protocol: HTTP + port: 80 +` + + var defaultHTTPRoute = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httpbin +spec: + parentRefs: + - name: api7ee + hostnames: + - "httpbin.org" + rules: + - matches: + - path: + type: Exact + value: /get + - path: + type: Exact + value: /headers + backendRefs: + - name: httpbin-service-e2e-test + port: 80 +` + Context("Rewrite Upstream Host", func() { + var createUpstreamHost = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: httpbin + namespace: default +spec: + targetRefs: + - name: httpbin-service-e2e-test + kind: Service + group: "" + passHost: rewrite + upstreamHost: httpbin.example.com +` + + var updateUpstreamHost = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: httpbin + namespace: default +spec: + targetRefs: + - name: httpbin-service-e2e-test + kind: Service + group: "" + passHost: rewrite + upstreamHost: httpbin.update.example.com +` + + BeforeEach(func() { + s.ApplyDefaultGatewayResource(defaultGatewayClass, defaultGateway, defaultHTTPRoute) + }) + It("should rewrite upstream host", func() { + s.ResourceApplied("BackendTrafficPolicy", "httpbin", createUpstreamHost, 1) + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body().Contains("httpbin.example.com") + + s.ResourceApplied("BackendTrafficPolicy", "httpbin", updateUpstreamHost, 2) + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body().Contains("httpbin.update.example.com") + + err := s.DeleteResourceFromString(createUpstreamHost) + Expect(err).NotTo(HaveOccurred(), "deleting BackendTrafficPolicy") + time.Sleep(5 * time.Second) + + s.NewAPISIXClient(). + GET("/headers"). + WithHost("httpbin.org"). + Expect(). + Status(200). + Body(). + NotContains("httpbin.update.example.com"). + NotContains("httpbin.example.com") + }) + }) +}) From 930c6b822c007b22f4d36c95fb63e884a4496241 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 10:15:35 +0800 Subject: [PATCH 08/11] fix test --- internal/controller/indexer/indexer.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 90310b07e..f245e2374 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -134,15 +134,6 @@ func setupHTTPRouteIndexer(mgr ctrl.Manager) error { ); err != nil { return err } - - if err := mgr.GetFieldIndexer().IndexField( - context.Background(), - &v1alpha1.BackendTrafficPolicy{}, - PolicyTargetRefs, - BackendTrafficPolicyIndexFunc, - ); err != nil { - return err - } return nil } From 7123bf8fe53ebac10296b9780cdb0c7b7ef63616 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 11:06:48 +0800 Subject: [PATCH 09/11] merge --- test/e2e/crds/backendtrafficpolicy.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/test/e2e/crds/backendtrafficpolicy.go b/test/e2e/crds/backendtrafficpolicy.go index f017584de..741a0a18d 100644 --- a/test/e2e/crds/backendtrafficpolicy.go +++ b/test/e2e/crds/backendtrafficpolicy.go @@ -12,6 +12,23 @@ import ( var _ = Describe("Test BackendTrafficPolicy", func() { s := scaffold.NewDefaultScaffold() + var defaultGatewayProxy = ` +apiVersion: gateway.apisix.io/v1alpha1 +kind: GatewayProxy +metadata: + name: api7-proxy-config +spec: + provider: + type: ControlPlane + controlPlane: + endpoints: + - %s + auth: + type: AdminKey + adminKey: + value: "%s" +` + var defaultGatewayClass = ` apiVersion: gateway.networking.k8s.io/v1 kind: GatewayClass @@ -88,7 +105,7 @@ spec: ` BeforeEach(func() { - s.ApplyDefaultGatewayResource(defaultGatewayClass, defaultGateway, defaultHTTPRoute) + s.ApplyDefaultGatewayResource(defaultGatewayProxy, defaultGatewayClass, defaultGateway, defaultHTTPRoute) }) It("should rewrite upstream host", func() { s.ResourceApplied("BackendTrafficPolicy", "httpbin", createUpstreamHost, 1) From 800209d6b67599bbe625a61789385b769b7948a0 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 11:43:37 +0800 Subject: [PATCH 10/11] merge --- test/e2e/scaffold/k8s.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index 4ba4ba28c..ea44e8dfa 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -24,6 +24,7 @@ import ( "github.com/api7/api7-ingress-controller/pkg/dashboard" apisix "github.com/api7/api7-ingress-controller/pkg/dashboard" + "github.com/api7/api7-ingress-controller/test/e2e/framework" "github.com/gruntwork-io/terratest/modules/k8s" "github.com/gruntwork-io/terratest/modules/retry" "github.com/gruntwork-io/terratest/modules/testing" @@ -244,14 +245,21 @@ func (s *Scaffold) ResourceApplied(resourType, resourceName, resourceRaw string, } func (s *Scaffold) ApplyDefaultGatewayResource( + defaultGatewayProxy string, defaultGatewayClass string, defaultGateway string, defaultHTTPRoute string, ) { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(defaultGatewayProxy, framework.DashboardTLSEndpoint, s.AdminKey()) + err := s.CreateResourceFromString(gatewayProxy) + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(5 * time.Second) + By("create GatewayClass") gatewayClassName := fmt.Sprintf("api7-%d", time.Now().Unix()) gatewayString := fmt.Sprintf(defaultGatewayClass, gatewayClassName, s.GetControllerName()) - err := s.CreateResourceFromStringWithNamespace(gatewayString, "") + err = s.CreateResourceFromStringWithNamespace(gatewayString, "") Expect(err).NotTo(HaveOccurred(), "creating GatewayClass") time.Sleep(5 * time.Second) From a658c8c18d20eb66392d73b7ffd374e9fb3bed41 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 18 Apr 2025 13:35:25 +0800 Subject: [PATCH 11/11] fix test --- test/e2e/crds/backendtrafficpolicy.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/e2e/crds/backendtrafficpolicy.go b/test/e2e/crds/backendtrafficpolicy.go index 741a0a18d..90c82bf07 100644 --- a/test/e2e/crds/backendtrafficpolicy.go +++ b/test/e2e/crds/backendtrafficpolicy.go @@ -49,6 +49,11 @@ spec: - name: http1 protocol: HTTP port: 80 + infrastructure: + parametersRef: + group: gateway.apisix.io + kind: GatewayProxy + name: api7-proxy-config ` var defaultHTTPRoute = ` @@ -79,7 +84,6 @@ apiVersion: gateway.apisix.io/v1alpha1 kind: BackendTrafficPolicy metadata: name: httpbin - namespace: default spec: targetRefs: - name: httpbin-service-e2e-test @@ -94,7 +98,6 @@ apiVersion: gateway.apisix.io/v1alpha1 kind: BackendTrafficPolicy metadata: name: httpbin - namespace: default spec: targetRefs: - name: httpbin-service-e2e-test