Skip to content

Commit d7a96ec

Browse files
committed
fix: remove duplicate logic
Signed-off-by: Ashing Zheng <[email protected]>
1 parent 33d3cac commit d7a96ec

File tree

5 files changed

+118
-179
lines changed

5 files changed

+118
-179
lines changed

internal/controller/apisixroute_controller.go

Lines changed: 8 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -356,44 +356,15 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
356356
}
357357
tc.Services[serviceNN] = &service
358358

359-
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
360-
if r.supportsEndpointSlice {
361-
var endpoints discoveryv1.EndpointSliceList
362-
if err := r.List(ctx, &endpoints,
363-
client.InNamespace(service.Namespace),
364-
client.MatchingLabels{
365-
discoveryv1.LabelServiceName: service.Name,
366-
},
367-
); err != nil {
368-
return types.ReasonError{
369-
Reason: string(apiv2.ConditionReasonInvalidSpec),
370-
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
371-
}
372-
}
359+
// backend.subset specifies a subset of upstream nodes.
360+
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
361+
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
373362

374-
// backend.subset specifies a subset of upstream nodes.
375-
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
376-
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
377-
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
378-
} else {
379-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
380-
var ep corev1.Endpoints
381-
if err := r.Get(ctx, serviceNN, &ep); err != nil {
382-
if client.IgnoreNotFound(err) != nil {
383-
return types.ReasonError{
384-
Reason: string(apiv2.ConditionReasonInvalidSpec),
385-
Message: fmt.Sprintf("failed to get endpoints: %v", err),
386-
}
387-
}
388-
// If endpoints not found, create empty EndpointSlice list
389-
tc.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
390-
} else {
391-
// Convert Endpoints to EndpointSlice format for internal consistency
392-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)
393-
394-
// Apply subset filtering to converted EndpointSlices
395-
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
396-
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, convertedEndpointSlices, subsetLabels)
363+
// Collect endpoints with EndpointSlice support and subset filtering
364+
if err := collectEndpointsWithEndpointSliceSupport(ctx, r.Client, tc, serviceNN, r.supportsEndpointSlice, subsetLabels); err != nil {
365+
return types.ReasonError{
366+
Reason: string(apiv2.ConditionReasonInvalidSpec),
367+
Message: err.Error(),
397368
}
398369
}
399370
}
@@ -665,41 +636,3 @@ func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext,
665636

666637
return nil
667638
}
668-
669-
func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.Context, in []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
670-
if len(labels) == 0 {
671-
return in
672-
}
673-
674-
for i := range in {
675-
in[i] = r.filterEndpointSliceByTargetPod(ctx, in[i], labels)
676-
}
677-
678-
return utils.Filter(in, func(v discoveryv1.EndpointSlice) bool {
679-
return len(v.Endpoints) > 0
680-
})
681-
}
682-
683-
// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels
684-
func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Context, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice {
685-
item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool {
686-
if v.TargetRef == nil || v.TargetRef.Kind != KindPod {
687-
return true
688-
}
689-
690-
var (
691-
pod corev1.Pod
692-
podNN = k8stypes.NamespacedName{
693-
Namespace: v.TargetRef.Namespace,
694-
Name: v.TargetRef.Name,
695-
}
696-
)
697-
if err := r.Get(ctx, podNN, &pod); err != nil {
698-
return false
699-
}
700-
701-
return utils.IsSubsetOf(labels, pod.GetLabels())
702-
})
703-
704-
return item
705-
}

internal/controller/gatewayproxy_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request
109109
if providerService == nil {
110110
tctx.EndpointSlices[req.NamespacedName] = nil
111111
} else {
112-
if err := addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, r.Client, types.NamespacedName{
112+
serviceNN := types.NamespacedName{
113113
Namespace: gp.Namespace,
114114
Name: providerService.Name,
115-
}, r.supportsEndpointSlice); err != nil {
115+
}
116+
if err := collectEndpointsWithEndpointSliceSupport(tctx, r.Client, tctx, serviceNN, r.supportsEndpointSlice, nil); err != nil {
116117
return reconcile.Result{}, err
117118
}
118119
}

internal/controller/httproute_controller.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -570,36 +570,11 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla
570570
}
571571
tctx.Services[targetNN] = &service
572572

573-
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
574-
if r.supportsEndpointSlice {
575-
endpointSliceList := new(discoveryv1.EndpointSliceList)
576-
if err := r.List(tctx, endpointSliceList,
577-
client.InNamespace(targetNN.Namespace),
578-
client.MatchingLabels{
579-
discoveryv1.LabelServiceName: targetNN.Name,
580-
},
581-
); err != nil {
582-
r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN)
583-
terr = err
584-
continue
585-
}
586-
tctx.EndpointSlices[targetNN] = endpointSliceList.Items
587-
} else {
588-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
589-
var endpoints corev1.Endpoints
590-
if err := r.Get(tctx, targetNN, &endpoints); err != nil {
591-
if client.IgnoreNotFound(err) != nil {
592-
r.Log.Error(err, "failed to get endpoints", "Service", targetNN)
593-
terr = err
594-
continue
595-
}
596-
// If endpoints not found, create empty EndpointSlice list
597-
tctx.EndpointSlices[targetNN] = []discoveryv1.EndpointSlice{}
598-
} else {
599-
// Convert Endpoints to EndpointSlice format for internal consistency
600-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
601-
tctx.EndpointSlices[targetNN] = convertedEndpointSlices
602-
}
573+
// Collect endpoints with EndpointSlice support
574+
if err := collectEndpointsWithEndpointSliceSupport(tctx, r.Client, tctx, targetNN, r.supportsEndpointSlice, nil); err != nil {
575+
r.Log.Error(err, "failed to collect endpoints", "Service", targetNN)
576+
terr = err
577+
continue
603578
}
604579
}
605580
return terr

internal/controller/ingress_controller.go

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -606,37 +606,10 @@ func (r *IngressReconciler) processBackendService(tctx *provider.TranslateContex
606606
return err
607607
}
608608

609-
// Conditionally get EndpointSlice or Endpoints based on cluster API support
610-
if r.supportsEndpointSlice {
611-
// get the endpoint slices
612-
endpointSliceList := &discoveryv1.EndpointSliceList{}
613-
if err := r.List(tctx, endpointSliceList,
614-
client.InNamespace(namespace),
615-
client.MatchingLabels{
616-
discoveryv1.LabelServiceName: backendService.Name,
617-
},
618-
); err != nil {
619-
r.Log.Error(err, "failed to list endpoint slices", "namespace", namespace, "name", backendService.Name)
620-
return err
621-
}
622-
623-
// save the endpoint slices to the translate context
624-
tctx.EndpointSlices[serviceNS] = endpointSliceList.Items
625-
} else {
626-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
627-
var endpoints corev1.Endpoints
628-
if err := r.Get(tctx, serviceNS, &endpoints); err != nil {
629-
if client.IgnoreNotFound(err) != nil {
630-
r.Log.Error(err, "failed to get endpoints", "namespace", namespace, "name", backendService.Name)
631-
return err
632-
}
633-
// If endpoints not found, create empty EndpointSlice list
634-
tctx.EndpointSlices[serviceNS] = []discoveryv1.EndpointSlice{}
635-
} else {
636-
// Convert Endpoints to EndpointSlice format for internal consistency
637-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
638-
tctx.EndpointSlices[serviceNS] = convertedEndpointSlices
639-
}
609+
// Collect endpoints with EndpointSlice support
610+
if err := collectEndpointsWithEndpointSliceSupport(tctx, r.Client, tctx, serviceNS, r.supportsEndpointSlice, nil); err != nil {
611+
r.Log.Error(err, "failed to collect endpoints", "namespace", namespace, "name", backendService.Name)
612+
return err
640613
}
641614
tctx.Services[serviceNS] = &service
642615
return nil

internal/controller/utils.go

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -957,10 +957,11 @@ func ProcessGatewayProxy(r client.Client, log logr.Logger, tctx *provider.Transl
957957
}
958958

959959
if cp.Service != nil {
960-
if err := addProviderEndpointsToTranslateContext(tctx, r, k8stypes.NamespacedName{
960+
serviceNN := k8stypes.NamespacedName{
961961
Namespace: gatewayProxy.GetNamespace(),
962962
Name: cp.Service.Name,
963-
}); err != nil {
963+
}
964+
if err := addProviderEndpointsToTranslateContext(tctx, r, serviceNN); err != nil {
964965
return err
965966
}
966967
}
@@ -1357,10 +1358,11 @@ func ProcessIngressClassParameters(tctx *provider.TranslateContext, c client.Cli
13571358

13581359
// process control plane provider service
13591360
if cp.Service != nil {
1360-
if err := addProviderEndpointsToTranslateContext(tctx, c, client.ObjectKey{
1361+
serviceNN := k8stypes.NamespacedName{
13611362
Namespace: gatewayProxy.GetNamespace(),
13621363
Name: cp.Service.Name,
1363-
}); err != nil {
1364+
}
1365+
if err := addProviderEndpointsToTranslateContext(tctx, c, serviceNN); err != nil {
13641366
return err
13651367
}
13661368
}
@@ -1419,10 +1421,6 @@ func distinctRequests(requests []reconcile.Request) []reconcile.Request {
14191421
}
14201422

14211423
func addProviderEndpointsToTranslateContext(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName) error {
1422-
return addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, c, serviceNN, true)
1423-
}
1424-
1425-
func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName, supportsEndpointSlice bool) error {
14261424
log.Debugw("to process provider endpoints by provider.service", zap.Any("service", serviceNN))
14271425
var (
14281426
service corev1.Service
@@ -1433,39 +1431,7 @@ func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provid
14331431
}
14341432
tctx.Services[serviceNN] = &service
14351433

1436-
// Conditionally get EndpointSlice or Endpoints based on cluster API support
1437-
if supportsEndpointSlice {
1438-
// get es
1439-
var (
1440-
esList discoveryv1.EndpointSliceList
1441-
)
1442-
if err := c.List(tctx, &esList,
1443-
client.InNamespace(serviceNN.Namespace),
1444-
client.MatchingLabels{
1445-
discoveryv1.LabelServiceName: serviceNN.Name,
1446-
}); err != nil {
1447-
log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN))
1448-
return err
1449-
}
1450-
tctx.EndpointSlices[serviceNN] = esList.Items
1451-
} else {
1452-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
1453-
var endpoints corev1.Endpoints
1454-
if err := c.Get(tctx, serviceNN, &endpoints); err != nil {
1455-
if client.IgnoreNotFound(err) != nil {
1456-
log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN))
1457-
return err
1458-
}
1459-
// If endpoints not found, create empty EndpointSlice list
1460-
tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
1461-
} else {
1462-
// Convert Endpoints to EndpointSlice format for internal consistency
1463-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
1464-
tctx.EndpointSlices[serviceNN] = convertedEndpointSlices
1465-
}
1466-
}
1467-
1468-
return nil
1434+
return collectEndpointsWithEndpointSliceSupport(tctx, c, tctx, serviceNN, true, nil)
14691435
}
14701436

14711437
func TypePredicate[T client.Object]() func(obj client.Object) bool {
@@ -1518,3 +1484,94 @@ func watchEndpointSliceOrEndpoints(bdr *ctrl.Builder, supportsEndpointSlice bool
15181484
return bdr.Watches(&corev1.Endpoints{}, handler.EnqueueRequestsFromMapFunc(endpointsMapFunc))
15191485
}
15201486
}
1487+
1488+
// collectEndpointsWithEndpointSliceSupport collects endpoints and adds them to the translate context
1489+
// It handles both EndpointSlice (K8s 1.19+) and Endpoints (K8s 1.18) APIs with automatic fallback
1490+
func collectEndpointsWithEndpointSliceSupport(
1491+
ctx context.Context,
1492+
c client.Client,
1493+
tctx *provider.TranslateContext,
1494+
serviceNN k8stypes.NamespacedName,
1495+
supportsEndpointSlice bool,
1496+
subsetLabels map[string]string,
1497+
) error {
1498+
if supportsEndpointSlice {
1499+
var endpoints discoveryv1.EndpointSliceList
1500+
if err := c.List(ctx, &endpoints,
1501+
client.InNamespace(serviceNN.Namespace),
1502+
client.MatchingLabels{
1503+
discoveryv1.LabelServiceName: serviceNN.Name,
1504+
},
1505+
); err != nil {
1506+
return fmt.Errorf("failed to list endpoint slices: %v", err)
1507+
}
1508+
1509+
if len(subsetLabels) == 0 {
1510+
tctx.EndpointSlices[serviceNN] = endpoints.Items
1511+
} else {
1512+
// Apply subset filtering
1513+
tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, endpoints.Items, subsetLabels)
1514+
}
1515+
} else {
1516+
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
1517+
var ep corev1.Endpoints
1518+
if err := c.Get(ctx, serviceNN, &ep); err != nil {
1519+
if client.IgnoreNotFound(err) != nil {
1520+
return fmt.Errorf("failed to get endpoints: %v", err)
1521+
}
1522+
// If endpoints not found, create empty EndpointSlice list
1523+
tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
1524+
} else {
1525+
// Convert Endpoints to EndpointSlice format for internal consistency
1526+
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)
1527+
1528+
if len(subsetLabels) == 0 {
1529+
tctx.EndpointSlices[serviceNN] = convertedEndpointSlices
1530+
} else {
1531+
// Apply subset filtering to converted EndpointSlices
1532+
tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, convertedEndpointSlices, subsetLabels)
1533+
}
1534+
}
1535+
}
1536+
1537+
return nil
1538+
}
1539+
1540+
// filterEndpointSlicesBySubsetLabels filters EndpointSlices by subset labels
1541+
func filterEndpointSlicesBySubsetLabels(ctx context.Context, c client.Client, endpointSlices []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
1542+
if len(labels) == 0 {
1543+
return endpointSlices
1544+
}
1545+
1546+
for i := range endpointSlices {
1547+
endpointSlices[i] = filterEndpointSliceByTargetPod(ctx, c, endpointSlices[i], labels)
1548+
}
1549+
1550+
return utils.Filter(endpointSlices, func(v discoveryv1.EndpointSlice) bool {
1551+
return len(v.Endpoints) > 0
1552+
})
1553+
}
1554+
1555+
// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels
1556+
func filterEndpointSliceByTargetPod(ctx context.Context, c client.Client, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice {
1557+
item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool {
1558+
if v.TargetRef == nil || v.TargetRef.Kind != KindPod {
1559+
return true
1560+
}
1561+
1562+
var (
1563+
pod corev1.Pod
1564+
podNN = k8stypes.NamespacedName{
1565+
Namespace: v.TargetRef.Namespace,
1566+
Name: v.TargetRef.Name,
1567+
}
1568+
)
1569+
if err := c.Get(ctx, podNN, &pod); err != nil {
1570+
return false
1571+
}
1572+
1573+
return utils.IsSubsetOf(labels, pod.GetLabels())
1574+
})
1575+
1576+
return item
1577+
}

0 commit comments

Comments
 (0)