Skip to content

Commit 97e302b

Browse files
committed
feat: support backendtrafficpolicy for httproute
1 parent 1aa1050 commit 97e302b

File tree

9 files changed

+247
-18
lines changed

9 files changed

+247
-18
lines changed

api/adc/types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ type Route struct {
126126
}
127127

128128
type Timeout struct {
129-
Connect int64 `json:"connect"`
130-
Read int64 `json:"read"`
131-
Send int64 `json:"send"`
129+
Connect int `json:"connect"`
130+
Read int `json:"read"`
131+
Send int `json:"send"`
132132
}
133133

134134
type StreamRoute struct {

api/v1alpha1/backendtrafficpolicy_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,16 @@ type LoadBalancer struct {
6969

7070
type Timeout struct {
7171
// +kubebuilder:default="60s"
72+
// +kubebuilder:validation:Pattern=`^[0-9]+s$`
73+
// +kubebuilder:validation:Type=string
7274
Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"`
7375
// +kubebuilder:default="60s"
76+
// +kubebuilder:validation:Pattern=`^[0-9]+s$`
77+
// +kubebuilder:validation:Type=string
7478
Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
7579
// +kubebuilder:default="60s"
80+
// +kubebuilder:validation:Pattern=`^[0-9]+s$`
81+
// +kubebuilder:validation:Type=string
7682
Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
7783
}
7884

config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,15 @@ spec:
164164
properties:
165165
connect:
166166
default: 60s
167+
pattern: ^[0-9]+s$
167168
type: string
168169
read:
169170
default: 60s
171+
pattern: ^[0-9]+s$
170172
type: string
171173
send:
172174
default: 60s
175+
pattern: ^[0-9]+s$
173176
type: string
174177
type: object
175178
upstream_host:

internal/controller/gateway_controller.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7676
if err := r.Provider.Delete(ctx, gateway); err != nil {
7777
return ctrl.Result{}, err
7878
}
79+
return ctrl.Result{}, nil
7980
}
8081
return ctrl.Result{}, err
8182
}
@@ -222,6 +223,9 @@ func (r *GatewayReconciler) listGatewaysForGatewayProxy(ctx context.Context, obj
222223

223224
recs := make([]reconcile.Request, 0, len(gatewayList.Items))
224225
for _, gateway := range gatewayList.Items {
226+
if !r.checkGatewayClass(&gateway) {
227+
continue
228+
}
225229
recs = append(recs, reconcile.Request{
226230
NamespacedName: client.ObjectKey{
227231
Namespace: gateway.GetNamespace(),
@@ -232,7 +236,7 @@ func (r *GatewayReconciler) listGatewaysForGatewayProxy(ctx context.Context, obj
232236
return recs
233237
}
234238

235-
func (r *GatewayReconciler) listGatewaysForHTTPRoute(_ context.Context, obj client.Object) []reconcile.Request {
239+
func (r *GatewayReconciler) listGatewaysForHTTPRoute(ctx context.Context, obj client.Object) []reconcile.Request {
236240
httpRoute, ok := obj.(*gatewayv1.HTTPRoute)
237241
if !ok {
238242
r.Log.Error(
@@ -256,6 +260,18 @@ func (r *GatewayReconciler) listGatewaysForHTTPRoute(_ context.Context, obj clie
256260
gatewayNamespace = string(*parentRef.Namespace)
257261
}
258262

263+
gateway := new(gatewayv1.Gateway)
264+
if err := r.Get(ctx, client.ObjectKey{
265+
Namespace: gatewayNamespace,
266+
Name: string(parentRef.Name),
267+
}, gateway); err != nil {
268+
continue
269+
}
270+
271+
if !r.checkGatewayClass(gateway) {
272+
continue
273+
}
274+
259275
recs = append(recs, reconcile.Request{
260276
NamespacedName: client.ObjectKey{
261277
Namespace: gatewayNamespace,

internal/controller/httproute_controller.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"strings"
77

88
"github.com/go-logr/logr"
9-
"go.uber.org/zap"
109
corev1 "k8s.io/api/core/v1"
1110
discoveryv1 "k8s.io/api/discovery/v1"
1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -19,12 +18,12 @@ import (
1918
"sigs.k8s.io/controller-runtime/pkg/handler"
2019
"sigs.k8s.io/controller-runtime/pkg/predicate"
2120
"sigs.k8s.io/controller-runtime/pkg/reconcile"
21+
"sigs.k8s.io/controller-runtime/pkg/source"
2222
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
2323

2424
"github.com/api7/api7-ingress-controller/api/v1alpha1"
2525
"github.com/api7/api7-ingress-controller/internal/controller/indexer"
2626
"github.com/api7/api7-ingress-controller/internal/provider"
27-
"github.com/api7/gopkg/pkg/log"
2827
)
2928

3029
// HTTPRouteReconciler reconciles a GatewayClass object.
@@ -35,10 +34,14 @@ type HTTPRouteReconciler struct { //nolint:revive
3534
Log logr.Logger
3635

3736
Provider provider.Provider
37+
38+
genericEvent chan event.GenericEvent
3839
}
3940

4041
// SetupWithManager sets up the controller with the Manager.
4142
func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
43+
r.genericEvent = make(chan event.GenericEvent, 100)
44+
4245
return ctrl.NewControllerManagedBy(mgr).
4346
For(&gatewayv1.HTTPRoute{}).
4447
WithEventFilter(predicate.GenerationChangedPredicate{}).
@@ -69,10 +72,91 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
6972
).
7073
Watches(&v1alpha1.BackendTrafficPolicy{},
7174
handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesForBackendTrafficPolicy),
75+
builder.WithPredicates(
76+
predicate.Funcs{
77+
GenericFunc: func(e event.GenericEvent) bool {
78+
return false
79+
},
80+
DeleteFunc: func(e event.DeleteEvent) bool {
81+
return true
82+
},
83+
CreateFunc: func(e event.CreateEvent) bool {
84+
return true
85+
},
86+
UpdateFunc: func(e event.UpdateEvent) bool {
87+
oldObj, ok := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy)
88+
newObj, ok2 := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy)
89+
if !ok || !ok2 {
90+
return false
91+
}
92+
oldRefs := oldObj.Spec.TargetRefs
93+
newRefs := newObj.Spec.TargetRefs
94+
95+
// 将旧引用转换为 Map
96+
oldRefMap := make(map[string]v1alpha1.BackendPolicyTargetReferenceWithSectionName)
97+
for _, ref := range oldRefs {
98+
key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name)
99+
oldRefMap[key] = ref
100+
}
101+
102+
for _, ref := range newRefs {
103+
key := fmt.Sprintf("%s/%s/%s", ref.Group, ref.Kind, ref.Name)
104+
delete(oldRefMap, key)
105+
}
106+
if len(oldRefMap) > 0 {
107+
targetRefs := make([]v1alpha1.BackendPolicyTargetReferenceWithSectionName, 0, len(oldRefs))
108+
for _, ref := range oldRefMap {
109+
targetRefs = append(targetRefs, ref)
110+
}
111+
dump := oldObj.DeepCopy()
112+
dump.Spec.TargetRefs = targetRefs
113+
r.genericEvent <- event.GenericEvent{
114+
Object: dump,
115+
}
116+
}
117+
return true
118+
},
119+
},
120+
),
121+
).
122+
WatchesRawSource(
123+
source.Channel(
124+
r.genericEvent,
125+
handler.EnqueueRequestsFromMapFunc(r.listHTTPRouteForGenericEvent),
126+
),
72127
).
73128
Complete(r)
74129
}
75130

131+
func (r *HTTPRouteReconciler) listHTTPRouteForGenericEvent(ctx context.Context, obj client.Object) []reconcile.Request {
132+
requests := []reconcile.Request{}
133+
switch v := obj.(type) {
134+
case *v1alpha1.BackendTrafficPolicy:
135+
httprouteAll := []gatewayv1.HTTPRoute{}
136+
for _, ref := range v.Spec.TargetRefs {
137+
httprouteList := &gatewayv1.HTTPRouteList{}
138+
if err := r.List(ctx, httprouteList, client.MatchingFields{
139+
indexer.ServiceIndexRef: indexer.GenIndexKey(v.GetNamespace(), string(ref.Name)),
140+
}); err != nil {
141+
r.Log.Error(err, "failed to list HTTPRoutes for BackendTrafficPolicy", "namespace", v.GetNamespace(), "ref", ref.Name)
142+
return nil
143+
}
144+
httprouteAll = append(httprouteAll, httprouteList.Items...)
145+
}
146+
for _, hr := range httprouteAll {
147+
requests = append(requests, reconcile.Request{
148+
NamespacedName: client.ObjectKey{
149+
Namespace: hr.Namespace,
150+
Name: hr.Name,
151+
},
152+
})
153+
}
154+
default:
155+
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy")
156+
}
157+
return requests
158+
}
159+
76160
func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
77161
hr := new(gatewayv1.HTTPRoute)
78162
if err := r.Get(ctx, req.NamespacedName, hr); err != nil {
@@ -131,7 +215,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
131215
}
132216
}
133217

134-
ProcessBackendTrafficPolicy(r.Client, tctx)
218+
ProcessBackendTrafficPolicy(r.Client, r.Log, tctx)
135219

136220
if err := r.Provider.Update(ctx, tctx, hr); err != nil {
137221
acceptStatus.status = false
@@ -156,6 +240,7 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
156240
if err := r.Status().Update(ctx, hr); err != nil {
157241
return ctrl.Result{}, err
158242
}
243+
UpdateStatus(r.Client, r.Log, tctx)
159244
return ctrl.Result{}, nil
160245
}
161246

@@ -253,7 +338,10 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context.
253338
},
254339
})
255340
}
256-
log.Errorw("list httproutes for backend traffic policy", zap.Any("httproutes", requests))
341+
if !policy.GetDeletionTimestamp().IsZero() {
342+
// If the policy is deleted, we need to list all HTTPRoutes that reference this policy
343+
// and add them to the requests.
344+
}
257345
return requests
258346
}
259347

internal/controller/source.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package controller
2+
3+
/*
4+
import (
5+
"context"
6+
"fmt"
7+
"strings"
8+
"time"
9+
10+
"github.com/api7/api7-ingress-controller/api/v1alpha1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"k8s.io/client-go/util/workqueue"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
"sigs.k8s.io/controller-runtime/pkg/event"
15+
"sigs.k8s.io/controller-runtime/pkg/handler"
16+
"sigs.k8s.io/controller-runtime/pkg/predicate"
17+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
18+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
19+
)
20+
21+
type TargetRefsChangedPredicate struct {
22+
predicate.Funcs
23+
DeletedRefs map[string]struct{} // 存储被删除的引用
24+
}
25+
26+
// Update 事件处理
27+
func (t *TargetRefsChangedPredicate) Update(e event.UpdateEvent) bool {
28+
oldPolicy, okOld := e.ObjectOld.(*v1alpha1.BackendTrafficPolicy)
29+
newPolicy, okNew := e.ObjectNew.(*v1alpha1.BackendTrafficPolicy)
30+
if !okOld || !okNew {
31+
return false
32+
}
33+
34+
// 计算被删除的引用
35+
oldRefs := oldPolicy.Spec.TargetRefs
36+
newRefs := newPolicy.Spec.TargetRefs
37+
38+
// 将旧引用转换为 Map
39+
oldRefMap := make(map[string]struct{})
40+
for _, ref := range oldRefs {
41+
key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name)
42+
oldRefMap[key] = struct{}{}
43+
}
44+
45+
// 找出被删除的引用
46+
t.DeletedRefs = make(map[string]struct{})
47+
for _, ref := range newRefs {
48+
key := fmt.Sprintf("%s/%s", ref.Kind, ref.Name)
49+
delete(oldRefMap, key)
50+
}
51+
for key := range oldRefMap {
52+
t.DeletedRefs[key] = struct{}{}
53+
}
54+
55+
return len(t.DeletedRefs) > 0
56+
}
57+
58+
type DeletedRefEventSource struct {
59+
Client client.Client
60+
Predicate *TargetRefsChangedPredicate
61+
}
62+
63+
// Start 实现 Source 接口
64+
func (s *DeletedRefEventSource) Start(
65+
ctx context.Context,
66+
handler handler.EventHandler,
67+
queue workqueue.RateLimitingInterface,
68+
predicates ...predicate.Predicate,
69+
) error {
70+
// 监听 BackendTrafficPolicy 的 Update 事件(已通过 Predicate 过滤)
71+
// 此处假设 Predicate 已捕获到被删除的引用
72+
go func() {
73+
for {
74+
select {
75+
case <-ctx.Done():
76+
return
77+
default:
78+
if len(s.Predicate.DeletedRefs) == 0 {
79+
time.Sleep(1 * time.Second)
80+
continue
81+
}
82+
83+
// 遍历被删除的引用,查找关联的 HTTPRoute
84+
for refKey := range s.Predicate.DeletedRefs {
85+
parts := strings.Split(refKey, "/")
86+
if len(parts) != 2 {
87+
continue
88+
}
89+
kind, name := parts[0], parts[1]
90+
91+
// 查找关联的 HTTPRoute
92+
var routes gatewayv1.HTTPRouteList
93+
if err := s.Client.List(
94+
context.Background(),
95+
&routes,
96+
client.MatchingFields{"targetRefs": refKey},
97+
); err != nil {
98+
continue
99+
}
100+
101+
// 生成调和请求
102+
for _, route := range routes.Items {
103+
req := reconcile.Request{
104+
NamespacedName: types.NamespacedName{
105+
Name: route.Name,
106+
Namespace: route.Namespace,
107+
},
108+
}
109+
handler.Generic(ctx, event.GenericEvent{Object: &route}, queue)
110+
}
111+
}
112+
113+
// 清空已处理的引用
114+
s.Predicate.DeletedRefs = make(map[string]struct{})
115+
time.Sleep(5 * time.Second) // 避免高频触发
116+
}
117+
}
118+
}()
119+
return nil
120+
}
121+
*/

internal/provider/adc/adc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"os"
99
"os/exec"
10+
"runtime/debug"
1011

1112
"go.uber.org/zap"
1213
networkingv1 "k8s.io/api/networking/v1"
@@ -90,7 +91,7 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext,
9091
}
9192

9293
func (d *adcClient) Delete(ctx context.Context, obj client.Object) error {
93-
log.Debugw("deleting object", zap.Any("object", obj))
94+
log.Debugw("deleting object", zap.Any("object", obj), zap.String("stack", string(debug.Stack())))
9495

9596
var resourceTypes []string
9697
var labels map[string]string

0 commit comments

Comments
 (0)