Skip to content

Commit 4d4f357

Browse files
committed
feat: support backendtrafficpolicy for httproute
1 parent 6066fb7 commit 4d4f357

File tree

9 files changed

+106
-35
lines changed

9 files changed

+106
-35
lines changed

api/adc/types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ type Route struct {
128128
}
129129

130130
type Timeout struct {
131-
Connect *int64 `json:"connect,omitempty"`
132-
Read *int64 `json:"read,omitempty"`
133-
Send *int64 `json:"send,omitempty"`
131+
Connect int64 `json:"connect"`
132+
Read int64 `json:"read"`
133+
Send int64 `json:"send"`
134134
}
135135

136136
type StreamRoute struct {

api/v1alpha1/backendtrafficpolicy_types.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,12 @@ type LoadBalancer struct {
7272
}
7373

7474
type Timeout struct {
75+
// +kubebuilder:default="60s"
7576
Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"`
76-
Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
77-
Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
77+
// +kubebuilder:default="60s"
78+
Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
79+
// +kubebuilder:default="60s"
80+
Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
7881
}
7982

8083
// +kubebuilder:object:root=true

config/crd/bases/gateway.apisix.io_backendtrafficpolicies.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,13 @@ spec:
168168
upstream.
169169
properties:
170170
connect:
171+
default: 60s
171172
type: string
172173
read:
174+
default: 60s
173175
type: string
174176
send:
177+
default: 60s
175178
type: string
176179
type: object
177180
upstream_host:

internal/controller/httproute_controller.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77

88
"github.com/go-logr/logr"
9+
"go.uber.org/zap"
910
corev1 "k8s.io/api/core/v1"
1011
discoveryv1 "k8s.io/api/discovery/v1"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -23,6 +24,7 @@ import (
2324
"github.com/api7/api7-ingress-controller/api/v1alpha1"
2425
"github.com/api7/api7-ingress-controller/internal/controller/indexer"
2526
"github.com/api7/api7-ingress-controller/internal/provider"
27+
"github.com/api7/gopkg/pkg/log"
2628
)
2729

2830
// HTTPRouteReconciler reconciles a GatewayClass object.
@@ -65,6 +67,9 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
6567
},
6668
),
6769
).
70+
Watches(&v1alpha1.BackendTrafficPolicy{},
71+
handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesForBackendTrafficPolicy),
72+
).
6873
Complete(r)
6974
}
7075

@@ -126,6 +131,8 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
126131
}
127132
}
128133

134+
ProcessBackendTrafficPolicy(r.Client, tctx)
135+
129136
if err := r.Provider.Update(ctx, tctx, hr); err != nil {
130137
acceptStatus.status = false
131138
acceptStatus.msg = err.Error()
@@ -208,6 +215,48 @@ func (r *HTTPRouteReconciler) listHTTPRoutesByExtensionRef(ctx context.Context,
208215
return requests
209216
}
210217

218+
func (r *HTTPRouteReconciler) listHTTPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request {
219+
policy, ok := obj.(*v1alpha1.BackendTrafficPolicy)
220+
if !ok {
221+
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy")
222+
return nil
223+
}
224+
225+
httprouteList := []gatewayv1.HTTPRoute{}
226+
for _, targetRef := range policy.Spec.TargetRefs {
227+
service := &corev1.Service{}
228+
if err := r.Get(ctx, client.ObjectKey{
229+
Namespace: policy.Namespace,
230+
Name: string(targetRef.Name),
231+
}, service); err != nil {
232+
if client.IgnoreNotFound(err) != nil {
233+
r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name)
234+
}
235+
continue
236+
}
237+
hrList := &gatewayv1.HTTPRouteList{}
238+
if err := r.List(ctx, hrList, client.MatchingFields{
239+
indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)),
240+
}); err != nil {
241+
r.Log.Error(err, "failed to list httproutes by service reference", "service", targetRef.Name)
242+
return nil
243+
}
244+
httprouteList = append(httprouteList, hrList.Items...)
245+
}
246+
247+
requests := make([]reconcile.Request, 0, len(httprouteList))
248+
for _, hr := range httprouteList {
249+
requests = append(requests, reconcile.Request{
250+
NamespacedName: client.ObjectKey{
251+
Namespace: hr.Namespace,
252+
Name: hr.Name,
253+
},
254+
})
255+
}
256+
log.Errorw("list httproutes for backend traffic policy", zap.Any("httproutes", requests))
257+
return requests
258+
}
259+
211260
func (r *HTTPRouteReconciler) listHTTPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request {
212261
gateway, ok := obj.(*gatewayv1.Gateway)
213262
if !ok {

internal/controller/indexer/indexer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ func setupHTTPRouteIndexer(mgr ctrl.Manager) error {
131131
); err != nil {
132132
return err
133133
}
134+
135+
if err := mgr.GetFieldIndexer().IndexField(
136+
context.Background(),
137+
&v1alpha1.BackendTrafficPolicy{},
138+
PolicyTargetRefs,
139+
BackendTrafficPolicyIndexFunc,
140+
); err != nil {
141+
return err
142+
}
134143
return nil
135144
}
136145

@@ -314,7 +323,7 @@ func BackendTrafficPolicyIndexFunc(rawObj client.Object) []string {
314323
for _, ref := range btp.Spec.TargetRefs {
315324
keys = append(keys,
316325
GenIndexKeyWithGK(
317-
v1alpha1.GroupVersion.Group,
326+
string(ref.Group),
318327
string(ref.Kind),
319328
btp.GetNamespace(),
320329
string(ref.Name),

internal/controller/policies.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/api7/api7-ingress-controller/internal/controller/config"
1111
"github.com/api7/api7-ingress-controller/internal/controller/indexer"
1212
"github.com/api7/api7-ingress-controller/internal/provider"
13+
"github.com/go-logr/logr"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/runtime/schema"
1516
"k8s.io/apimachinery/pkg/types"
@@ -25,7 +26,9 @@ func (p PolicyTargetKey) String() string {
2526
return p.NsName.String() + "/" + p.GroupKind.String()
2627
}
2728

28-
func ProcessBackendTrafficPolicy(c client.Client, tctx *provider.TranslateContext) {
29+
func ProcessBackendTrafficPolicy(c client.Client,
30+
log logr.Logger,
31+
tctx *provider.TranslateContext) {
2932
conflicts := map[string]v1alpha1.BackendTrafficPolicy{}
3033
for _, service := range tctx.Services {
3134
backendTrafficPolicyList := &v1alpha1.BackendTrafficPolicyList{}
@@ -34,10 +37,8 @@ func ProcessBackendTrafficPolicy(c client.Client, tctx *provider.TranslateContex
3437
indexer.PolicyTargetRefs: indexer.GenIndexKeyWithGK("", "Service", service.Namespace, service.Name),
3538
},
3639
); err != nil {
37-
if client.IgnoreNotFound(err) == nil {
38-
continue
39-
}
40-
return
40+
log.Error(err, "failed to list BackendTrafficPolicy for Service")
41+
continue
4142
}
4243
if len(backendTrafficPolicyList.Items) == 0 {
4344
continue

internal/manager/run.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"crypto/tls"
66
"os"
7+
"time"
78

89
"github.com/go-logr/logr"
910
"k8s.io/apimachinery/pkg/runtime"
1011
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1112
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
13+
"k8s.io/utils/ptr"
1214
ctrl "sigs.k8s.io/controller-runtime"
1315
"sigs.k8s.io/controller-runtime/pkg/healthz"
1416
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -100,6 +102,9 @@ func Run(ctx context.Context, logger logr.Logger) error {
100102
LeaderElection: true,
101103
LeaderElectionID: cfg.LeaderElectionID,
102104
LeaderElectionNamespace: namespace,
105+
LeaseDuration: ptr.To(time.Second * 15),
106+
RenewDeadline: ptr.To(time.Second * 10),
107+
RetryPeriod: ptr.To(time.Second * 5),
103108
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
104109
// when the Manager ends. This requires the binary to immediately end when the
105110
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly

internal/provider/adc/translator/httproute.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,13 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
288288
backend.Namespace = &namespace
289289
}
290290
upNodes := t.translateBackendRef(tctx, backend.BackendRef)
291+
for _, po := range tctx.BackendTrafficPolicies {
292+
log.Errorw("backend traffic policy",
293+
zap.String("policy", po.Name),
294+
zap.String("namespace", po.Namespace))
295+
}
296+
297+
t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream)
291298
upstream.Nodes = append(upstream.Nodes, upNodes...)
292299
}
293300
t.attachBackendTrafficPolicyToUpstream(nil, upstream)

internal/provider/adc/translator/policies.go

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,29 @@ import (
44
"github.com/api7/api7-ingress-controller/api/adc"
55
"github.com/api7/api7-ingress-controller/api/v1alpha1"
66
"k8s.io/apimachinery/pkg/types"
7+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
78

89
adctypes "github.com/api7/api7-ingress-controller/api/adc"
910
)
1011

11-
func (t *Translator) AttachBackendTrafficPolicyToUpstream(policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) {
12+
func (t *Translator) AttachBackendTrafficPolicyToUpstream(ref gatewayv1.BackendRef, policies map[types.NamespacedName]*v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) {
1213
if len(policies) == 0 {
1314
return
1415
}
15-
for _, policy := range policies {
16-
t.attachBackendTrafficPolicyToUpstream(policy, upstream)
16+
var policy *v1alpha1.BackendTrafficPolicy
17+
for _, po := range policies {
18+
for _, targetRef := range po.Spec.TargetRefs {
19+
if ref.Name == targetRef.Name &&
20+
(ref.Namespace != nil && string(*ref.Namespace) == po.Namespace) {
21+
policy = po
22+
break
23+
}
24+
}
1725
}
18-
26+
if policy == nil {
27+
return
28+
}
29+
t.attachBackendTrafficPolicyToUpstream(policy, upstream)
1930
}
2031

2132
func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.BackendTrafficPolicy, upstream *adctypes.Upstream) {
@@ -29,27 +40,10 @@ func (t *Translator) attachBackendTrafficPolicyToUpstream(policy *v1alpha1.Backe
2940
*upstream.Retries = int64(*policy.Spec.Retries)
3041
}
3142
if policy.Spec.Timeout != nil {
32-
var (
33-
connect *int64
34-
read *int64
35-
send *int64
36-
)
37-
if policy.Spec.Timeout.Connect.Duration > 0 {
38-
connect = new(int64)
39-
*connect = policy.Spec.Timeout.Connect.Duration.Milliseconds()
40-
}
41-
if policy.Spec.Timeout.Read.Duration > 0 {
42-
read = new(int64)
43-
*read = policy.Spec.Timeout.Read.Duration.Milliseconds()
44-
}
45-
if policy.Spec.Timeout.Send.Duration > 0 {
46-
send = new(int64)
47-
*send = policy.Spec.Timeout.Send.Duration.Milliseconds()
48-
}
4943
upstream.Timeout = &adctypes.Timeout{
50-
Connect: connect,
51-
Read: read,
52-
Send: send,
44+
Connect: policy.Spec.Timeout.Connect.Duration.Milliseconds(),
45+
Read: policy.Spec.Timeout.Read.Duration.Milliseconds(),
46+
Send: policy.Spec.Timeout.Send.Duration.Milliseconds(),
5347
}
5448
}
5549
if policy.Spec.LoadBalancer != nil {

0 commit comments

Comments
 (0)