Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/provider/adc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/api7/gopkg/pkg/log"
"go.uber.org/zap"
discoveryv1 "k8s.io/api/discovery/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
Expand Down Expand Up @@ -95,12 +96,18 @@ func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, g
if endpoint == nil {
return nil, nil
}
upstreamNodes, err := d.translator.TranslateBackendRef(tctx, gatewayv1.BackendRef{
upstreamNodes, err := d.translator.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),
Port: ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)),
},
}, func(endpoint *discoveryv1.Endpoint) bool {
if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
log.Debugw("skip terminating endpoint", zap.Any("endpoint", endpoint))
return false
}
return true
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,5 @@ func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx
},
Weight: &weight,
}
return t.translateBackendRef(tctx, backendRef)
return t.translateBackendRef(tctx, backendRef, DefaultEndpointFilter)
}
23 changes: 17 additions & 6 deletions internal/provider/adc/translator/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (t *Translator) fillHTTPRoutePolicies(routes []*adctypes.Route, policies []
}
}

func (t *Translator) translateEndpointSlice(portName *string, weight int, endpointSlices []discoveryv1.EndpointSlice) adctypes.UpstreamNodes {
func (t *Translator) translateEndpointSlice(portName *string, weight int, endpointSlices []discoveryv1.EndpointSlice, endpointFilter func(*discoveryv1.Endpoint) bool) adctypes.UpstreamNodes {
var nodes adctypes.UpstreamNodes
if len(endpointSlices) == 0 {
return nodes
Expand All @@ -294,6 +294,9 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi
continue
}
for _, endpoint := range endpointSlice.Endpoints {
if endpointFilter != nil && !endpointFilter(&endpoint) {
continue
}
for _, addr := range endpoint.Addresses {
node := adctypes.UpstreamNode{
Host: addr,
Expand All @@ -312,11 +315,19 @@ func (t *Translator) translateEndpointSlice(portName *string, weight int, endpoi
return nodes
}

func (t *Translator) TranslateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef) (adctypes.UpstreamNodes, error) {
return t.translateBackendRef(tctx, ref)
func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool {
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
log.Debugw("skip not ready endpoint", zap.Any("endpoint", endpoint))
return false
}
return true
}

func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
return t.translateBackendRef(tctx, ref, endpointFilter)
}

func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef) (adctypes.UpstreamNodes, error) {
func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) {
if ref.Kind != nil && *ref.Kind != "Service" {
return adctypes.UpstreamNodes{}, fmt.Errorf("kind %s is not supported", *ref.Kind)
}
Expand Down Expand Up @@ -363,7 +374,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga
}

endpointSlices := tctx.EndpointSlices[key]
return t.translateEndpointSlice(portName, weight, endpointSlices), nil
return t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter), nil
}

// calculateHTTPRoutePriority calculates the priority of the HTTP route.
Expand Down Expand Up @@ -475,7 +486,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
namespace := gatewayv1.Namespace(httpRoute.Namespace)
backend.Namespace = &namespace
}
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef)
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
if err != nil {
backendErr = err
continue
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/adc/translator/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ func (t *Translator) translateEndpointSliceForIngress(weight int, endpointSlices
continue
}
for _, endpoint := range endpointSlice.Endpoints {
if !DefaultEndpointFilter(&endpoint) {
continue
}
for _, addr := range endpoint.Addresses {
node := adctypes.UpstreamNode{
Host: addr,
Expand Down
Loading