Skip to content

Commit a54da5a

Browse files
AlinsRanronething
andauthored
fix: support filter endpoint when translate backend ref. (#2451) (#198)
Signed-off-by: ashing <[email protected]> Co-authored-by: Ashing Zheng <[email protected]>
1 parent d4d9bfd commit a54da5a

File tree

4 files changed

+29
-8
lines changed

4 files changed

+29
-8
lines changed

internal/provider/adc/config.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/api7/gopkg/pkg/log"
2828
"go.uber.org/zap"
29+
discoveryv1 "k8s.io/api/discovery/v1"
2930
k8stypes "k8s.io/apimachinery/pkg/types"
3031
"k8s.io/utils/ptr"
3132
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
@@ -95,12 +96,18 @@ func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, g
9596
if endpoint == nil {
9697
return nil, nil
9798
}
98-
upstreamNodes, err := d.translator.TranslateBackendRef(tctx, gatewayv1.BackendRef{
99+
upstreamNodes, err := d.translator.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
99100
BackendObjectReference: gatewayv1.BackendObjectReference{
100101
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
101102
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),
102103
Port: ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)),
103104
},
105+
}, func(endpoint *discoveryv1.Endpoint) bool {
106+
if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
107+
log.Debugw("skip terminating endpoint", zap.Any("endpoint", endpoint))
108+
return false
109+
}
110+
return true
104111
})
105112
if err != nil {
106113
return nil, err

internal/provider/adc/translator/apisixroute.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,5 +339,5 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx
339339
},
340340
Weight: &weight,
341341
}
342-
return t.translateBackendRef(tctx, backendRef)
342+
return t.translateBackendRef(tctx, backendRef, DefaultEndpointFilter)
343343
}

internal/provider/adc/translator/httproute.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (t *Translator) fillHTTPRoutePolicies(routes []*adctypes.Route, policies []
283283
}
284284
}
285285

286-
func (t *Translator) translateEndpointSlice(portName *string, weight int, endpointSlices []discoveryv1.EndpointSlice) adctypes.UpstreamNodes {
286+
func (t *Translator) translateEndpointSlice(portName *string, weight int, endpointSlices []discoveryv1.EndpointSlice, endpointFilter func(*discoveryv1.Endpoint) bool) adctypes.UpstreamNodes {
287287
var nodes adctypes.UpstreamNodes
288288
if len(endpointSlices) == 0 {
289289
return nodes
@@ -294,6 +294,9 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi
294294
continue
295295
}
296296
for _, endpoint := range endpointSlice.Endpoints {
297+
if endpointFilter != nil && !endpointFilter(&endpoint) {
298+
continue
299+
}
297300
for _, addr := range endpoint.Addresses {
298301
node := adctypes.UpstreamNode{
299302
Host: addr,
@@ -312,11 +315,19 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi
312315
return nodes
313316
}
314317

315-
func (t *Translator) TranslateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef) (adctypes.UpstreamNodes, error) {
316-
return t.translateBackendRef(tctx, ref)
318+
func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool {
319+
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
320+
log.Debugw("skip not ready endpoint", zap.Any("endpoint", endpoint))
321+
return false
322+
}
323+
return true
324+
}
325+
326+
func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
327+
return t.translateBackendRef(tctx, ref, endpointFilter)
317328
}
318329

319-
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef) (adctypes.UpstreamNodes, error) {
330+
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
320331
if ref.Kind != nil && *ref.Kind != "Service" {
321332
return adctypes.UpstreamNodes{}, fmt.Errorf("kind %s is not supported", *ref.Kind)
322333
}
@@ -363,7 +374,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
363374
}
364375

365376
endpointSlices := tctx.EndpointSlices[key]
366-
return t.translateEndpointSlice(portName, weight, endpointSlices), nil
377+
return t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter), nil
367378
}
368379

369380
// calculateHTTPRoutePriority calculates the priority of the HTTP route.
@@ -475,7 +486,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
475486
namespace := gatewayv1.Namespace(httpRoute.Namespace)
476487
backend.Namespace = &namespace
477488
}
478-
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef)
489+
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
479490
if err != nil {
480491
backendErr = err
481492
continue

internal/provider/adc/translator/ingress.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ func (t *Translator) translateEndpointSliceForIngress(weight int, endpointSlices
234234
continue
235235
}
236236
for _, endpoint := range endpointSlice.Endpoints {
237+
if !DefaultEndpointFilter(&endpoint) {
238+
continue
239+
}
237240
for _, addr := range endpoint.Addresses {
238241
node := adctypes.UpstreamNode{
239242
Host: addr,

0 commit comments

Comments
 (0)