Skip to content

Commit 6ca4414

Browse files
authored
fix: remove duplicate logic (#221)
Signed-off-by: Ashing Zheng <[email protected]>
1 parent 7be81e9 commit 6ca4414

File tree

7 files changed

+134
-188
lines changed

7 files changed

+134
-188
lines changed

internal/controller/apisixroute_controller.go

Lines changed: 8 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -373,44 +373,15 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
373373
}
374374
tc.Services[serviceNN] = &service
375375

376-
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
377-
if r.supportsEndpointSlice {
378-
var endpoints discoveryv1.EndpointSliceList
379-
if err := r.List(ctx, &endpoints,
380-
client.InNamespace(service.Namespace),
381-
client.MatchingLabels{
382-
discoveryv1.LabelServiceName: service.Name,
383-
},
384-
); err != nil {
385-
return types.ReasonError{
386-
Reason: string(apiv2.ConditionReasonInvalidSpec),
387-
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
388-
}
389-
}
376+
// backend.subset specifies a subset of upstream nodes.
377+
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
378+
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
390379

391-
// backend.subset specifies a subset of upstream nodes.
392-
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
393-
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
394-
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
395-
} else {
396-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
397-
var ep corev1.Endpoints
398-
if err := r.Get(ctx, serviceNN, &ep); err != nil {
399-
if client.IgnoreNotFound(err) != nil {
400-
return types.ReasonError{
401-
Reason: string(apiv2.ConditionReasonInvalidSpec),
402-
Message: fmt.Sprintf("failed to get endpoints: %v", err),
403-
}
404-
}
405-
// If endpoints not found, create empty EndpointSlice list
406-
tc.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
407-
} else {
408-
// Convert Endpoints to EndpointSlice format for internal consistency
409-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)
410-
411-
// Apply subset filtering to converted EndpointSlices
412-
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
413-
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, convertedEndpointSlices, subsetLabels)
380+
// Collect endpoints with EndpointSlice support and subset filtering
381+
if err := resolveServiceEndpoints(ctx, r.Client, tc, serviceNN, r.supportsEndpointSlice, subsetLabels); err != nil {
382+
return types.ReasonError{
383+
Reason: string(apiv2.ConditionReasonInvalidSpec),
384+
Message: err.Error(),
414385
}
415386
}
416387
}
@@ -691,41 +662,3 @@ func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext,
691662

692663
return nil
693664
}
694-
695-
func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.Context, in []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
696-
if len(labels) == 0 {
697-
return in
698-
}
699-
700-
for i := range in {
701-
in[i] = r.filterEndpointSliceByTargetPod(ctx, in[i], labels)
702-
}
703-
704-
return utils.Filter(in, func(v discoveryv1.EndpointSlice) bool {
705-
return len(v.Endpoints) > 0
706-
})
707-
}
708-
709-
// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels
710-
func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Context, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice {
711-
item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool {
712-
if v.TargetRef == nil || v.TargetRef.Kind != KindPod {
713-
return true
714-
}
715-
716-
var (
717-
pod corev1.Pod
718-
podNN = k8stypes.NamespacedName{
719-
Namespace: v.TargetRef.Namespace,
720-
Name: v.TargetRef.Name,
721-
}
722-
)
723-
if err := r.Get(ctx, podNN, &pod); err != nil {
724-
return false
725-
}
726-
727-
return utils.IsSubsetOf(labels, pod.GetLabels())
728-
})
729-
730-
return item
731-
}

internal/controller/gatewayproxy_controller.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,16 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request
114114
if providerService == nil {
115115
tctx.EndpointSlices[req.NamespacedName] = nil
116116
} else {
117-
if err := addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, r.Client, k8stypes.NamespacedName{
117+
serviceNN := k8stypes.NamespacedName{
118118
Namespace: gp.Namespace,
119119
Name: providerService.Name,
120-
}, r.supportsEndpointSlice); err != nil {
120+
}
121+
service := &corev1.Service{}
122+
if err := r.Get(ctx, serviceNN, service); err != nil {
123+
return reconcile.Result{}, err
124+
}
125+
tctx.Services[serviceNN] = service
126+
if err := resolveServiceEndpoints(tctx, r.Client, tctx, serviceNN, r.supportsEndpointSlice, nil); err != nil {
121127
return reconcile.Result{}, err
122128
}
123129
}

internal/controller/httproute_controller.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -570,36 +570,11 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla
570570
}
571571
tctx.Services[targetNN] = &service
572572

573-
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
574-
if r.supportsEndpointSlice {
575-
endpointSliceList := new(discoveryv1.EndpointSliceList)
576-
if err := r.List(tctx, endpointSliceList,
577-
client.InNamespace(targetNN.Namespace),
578-
client.MatchingLabels{
579-
discoveryv1.LabelServiceName: targetNN.Name,
580-
},
581-
); err != nil {
582-
r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN)
583-
terr = err
584-
continue
585-
}
586-
tctx.EndpointSlices[targetNN] = endpointSliceList.Items
587-
} else {
588-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
589-
var endpoints corev1.Endpoints
590-
if err := r.Get(tctx, targetNN, &endpoints); err != nil {
591-
if client.IgnoreNotFound(err) != nil {
592-
r.Log.Error(err, "failed to get endpoints", "Service", targetNN)
593-
terr = err
594-
continue
595-
}
596-
// If endpoints not found, create empty EndpointSlice list
597-
tctx.EndpointSlices[targetNN] = []discoveryv1.EndpointSlice{}
598-
} else {
599-
// Convert Endpoints to EndpointSlice format for internal consistency
600-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
601-
tctx.EndpointSlices[targetNN] = convertedEndpointSlices
602-
}
573+
// Collect endpoints with EndpointSlice support
574+
if err := resolveServiceEndpoints(tctx, r.Client, tctx, targetNN, r.supportsEndpointSlice, nil); err != nil {
575+
r.Log.Error(err, "failed to collect endpoints", "Service", targetNN)
576+
terr = err
577+
continue
603578
}
604579
}
605580
return terr

internal/controller/ingress_controller.go

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -606,37 +606,10 @@ func (r *IngressReconciler) processBackendService(tctx *provider.TranslateContex
606606
return err
607607
}
608608

609-
// Conditionally get EndpointSlice or Endpoints based on cluster API support
610-
if r.supportsEndpointSlice {
611-
// get the endpoint slices
612-
endpointSliceList := &discoveryv1.EndpointSliceList{}
613-
if err := r.List(tctx, endpointSliceList,
614-
client.InNamespace(namespace),
615-
client.MatchingLabels{
616-
discoveryv1.LabelServiceName: backendService.Name,
617-
},
618-
); err != nil {
619-
r.Log.Error(err, "failed to list endpoint slices", "namespace", namespace, "name", backendService.Name)
620-
return err
621-
}
622-
623-
// save the endpoint slices to the translate context
624-
tctx.EndpointSlices[serviceNS] = endpointSliceList.Items
625-
} else {
626-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
627-
var endpoints corev1.Endpoints
628-
if err := r.Get(tctx, serviceNS, &endpoints); err != nil {
629-
if client.IgnoreNotFound(err) != nil {
630-
r.Log.Error(err, "failed to get endpoints", "namespace", namespace, "name", backendService.Name)
631-
return err
632-
}
633-
// If endpoints not found, create empty EndpointSlice list
634-
tctx.EndpointSlices[serviceNS] = []discoveryv1.EndpointSlice{}
635-
} else {
636-
// Convert Endpoints to EndpointSlice format for internal consistency
637-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
638-
tctx.EndpointSlices[serviceNS] = convertedEndpointSlices
639-
}
609+
// Collect endpoints with EndpointSlice support
610+
if err := resolveServiceEndpoints(tctx, r.Client, tctx, serviceNS, r.supportsEndpointSlice, nil); err != nil {
611+
r.Log.Error(err, "failed to collect endpoints", "namespace", namespace, "name", backendService.Name)
612+
return err
640613
}
641614
tctx.Services[serviceNS] = &service
642615
return nil

internal/controller/utils.go

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -964,10 +964,11 @@ func ProcessGatewayProxy(r client.Client, log logr.Logger, tctx *provider.Transl
964964
}
965965

966966
if cp.Service != nil {
967-
if err := addProviderEndpointsToTranslateContext(tctx, r, k8stypes.NamespacedName{
967+
serviceNN := k8stypes.NamespacedName{
968968
Namespace: gatewayProxy.GetNamespace(),
969969
Name: cp.Service.Name,
970-
}); err != nil {
970+
}
971+
if err := addProviderEndpointsToTranslateContext(tctx, r, serviceNN); err != nil {
971972
return err
972973
}
973974
}
@@ -1377,10 +1378,11 @@ func ProcessIngressClassParameters(tctx *provider.TranslateContext, c client.Cli
13771378

13781379
// process control plane provider service
13791380
if cp.Service != nil {
1380-
if err := addProviderEndpointsToTranslateContext(tctx, c, client.ObjectKey{
1381+
serviceNN := k8stypes.NamespacedName{
13811382
Namespace: gatewayProxy.GetNamespace(),
13821383
Name: cp.Service.Name,
1383-
}); err != nil {
1384+
}
1385+
if err := addProviderEndpointsToTranslateContext(tctx, c, serviceNN); err != nil {
13841386
return err
13851387
}
13861388
}
@@ -1486,10 +1488,6 @@ func distinctRequests(requests []reconcile.Request) []reconcile.Request {
14861488
}
14871489

14881490
func addProviderEndpointsToTranslateContext(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName) error {
1489-
return addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, c, serviceNN, true)
1490-
}
1491-
1492-
func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provider.TranslateContext, c client.Client, serviceNN k8stypes.NamespacedName, supportsEndpointSlice bool) error {
14931491
log.Debugw("to process provider endpoints by provider.service", zap.Any("service", serviceNN))
14941492
var (
14951493
service corev1.Service
@@ -1500,39 +1498,7 @@ func addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx *provid
15001498
}
15011499
tctx.Services[serviceNN] = &service
15021500

1503-
// Conditionally get EndpointSlice or Endpoints based on cluster API support
1504-
if supportsEndpointSlice {
1505-
// get es
1506-
var (
1507-
esList discoveryv1.EndpointSliceList
1508-
)
1509-
if err := c.List(tctx, &esList,
1510-
client.InNamespace(serviceNN.Namespace),
1511-
client.MatchingLabels{
1512-
discoveryv1.LabelServiceName: serviceNN.Name,
1513-
}); err != nil {
1514-
log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN))
1515-
return err
1516-
}
1517-
tctx.EndpointSlices[serviceNN] = esList.Items
1518-
} else {
1519-
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
1520-
var endpoints corev1.Endpoints
1521-
if err := c.Get(tctx, serviceNN, &endpoints); err != nil {
1522-
if client.IgnoreNotFound(err) != nil {
1523-
log.Errorw("failed to get endpoints for GatewayProxy provider", zap.Error(err), zap.Any("endpoints", serviceNN))
1524-
return err
1525-
}
1526-
// If endpoints not found, create empty EndpointSlice list
1527-
tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
1528-
} else {
1529-
// Convert Endpoints to EndpointSlice format for internal consistency
1530-
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
1531-
tctx.EndpointSlices[serviceNN] = convertedEndpointSlices
1532-
}
1533-
}
1534-
1535-
return nil
1501+
return resolveServiceEndpoints(tctx, c, tctx, serviceNN, true, nil)
15361502
}
15371503

15381504
func TypePredicate[T client.Object]() func(obj client.Object) bool {
@@ -1585,3 +1551,94 @@ func watchEndpointSliceOrEndpoints(bdr *ctrl.Builder, supportsEndpointSlice bool
15851551
return bdr.Watches(&corev1.Endpoints{}, handler.EnqueueRequestsFromMapFunc(endpointsMapFunc))
15861552
}
15871553
}
1554+
1555+
// resolveServiceEndpoints collects endpoints and adds them to the translate context
1556+
// It handles both EndpointSlice (K8s 1.19+) and Endpoints (K8s 1.18) APIs with automatic fallback
1557+
func resolveServiceEndpoints(
1558+
ctx context.Context,
1559+
c client.Client,
1560+
tctx *provider.TranslateContext,
1561+
serviceNN k8stypes.NamespacedName,
1562+
supportsEndpointSlice bool,
1563+
subsetLabels map[string]string,
1564+
) error {
1565+
if supportsEndpointSlice {
1566+
var endpoints discoveryv1.EndpointSliceList
1567+
if err := c.List(ctx, &endpoints,
1568+
client.InNamespace(serviceNN.Namespace),
1569+
client.MatchingLabels{
1570+
discoveryv1.LabelServiceName: serviceNN.Name,
1571+
},
1572+
); err != nil {
1573+
return fmt.Errorf("failed to list endpoint slices: %v", err)
1574+
}
1575+
1576+
if len(subsetLabels) == 0 {
1577+
tctx.EndpointSlices[serviceNN] = endpoints.Items
1578+
} else {
1579+
// Apply subset filtering
1580+
tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, endpoints.Items, subsetLabels)
1581+
}
1582+
} else {
1583+
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
1584+
var ep corev1.Endpoints
1585+
if err := c.Get(ctx, serviceNN, &ep); err != nil {
1586+
if client.IgnoreNotFound(err) != nil {
1587+
return fmt.Errorf("failed to get endpoints: %v", err)
1588+
}
1589+
// If endpoints not found, create empty EndpointSlice list
1590+
tctx.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
1591+
} else {
1592+
// Convert Endpoints to EndpointSlice format for internal consistency
1593+
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)
1594+
1595+
if len(subsetLabels) == 0 {
1596+
tctx.EndpointSlices[serviceNN] = convertedEndpointSlices
1597+
} else {
1598+
// Apply subset filtering to converted EndpointSlices
1599+
tctx.EndpointSlices[serviceNN] = filterEndpointSlicesBySubsetLabels(ctx, c, convertedEndpointSlices, subsetLabels)
1600+
}
1601+
}
1602+
}
1603+
1604+
return nil
1605+
}
1606+
1607+
// filterEndpointSlicesBySubsetLabels filters EndpointSlices by subset labels
1608+
func filterEndpointSlicesBySubsetLabels(ctx context.Context, c client.Client, endpointSlices []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
1609+
if len(labels) == 0 {
1610+
return endpointSlices
1611+
}
1612+
1613+
for i := range endpointSlices {
1614+
endpointSlices[i] = filterEndpointSliceByTargetPod(ctx, c, endpointSlices[i], labels)
1615+
}
1616+
1617+
return utils.Filter(endpointSlices, func(v discoveryv1.EndpointSlice) bool {
1618+
return len(v.Endpoints) > 0
1619+
})
1620+
}
1621+
1622+
// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels
1623+
func filterEndpointSliceByTargetPod(ctx context.Context, c client.Client, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice {
1624+
item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool {
1625+
if v.TargetRef == nil || v.TargetRef.Kind != KindPod {
1626+
return true
1627+
}
1628+
1629+
var (
1630+
pod corev1.Pod
1631+
podNN = k8stypes.NamespacedName{
1632+
Namespace: v.TargetRef.Namespace,
1633+
Name: v.TargetRef.Name,
1634+
}
1635+
)
1636+
if err := c.Get(ctx, podNN, &pod); err != nil {
1637+
return false
1638+
}
1639+
1640+
return utils.IsSubsetOf(labels, pod.GetLabels())
1641+
})
1642+
1643+
return item
1644+
}

test/e2e/crds/v1alpha1/gatewayproxy.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,16 @@ spec:
168168

169169
By("scale apisix to replicas 2")
170170
s.Deployer.DeployDataplane(scaffold.DeployDataplaneOptions{
171-
Replicas: ptr.To(2),
171+
Replicas: ptr.To(2),
172+
SkipCreateTunnels: true,
172173
})
173174

174175
By("check pod ready")
175-
err = wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
176-
pods := s.GetPods(s.Namespace(), "app.kubernetes.io/name=apisix")
177-
if len(pods) != 2 {
176+
err = wait.PollUntilContextTimeout(context.Background(), time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) {
177+
endpoints := s.GetEndpoints(s.Namespace(), framework.ProviderType)
178+
if len(endpoints.Subsets) != 1 || len(endpoints.Subsets[0].Addresses) != 2 {
178179
return false, nil
179180
}
180-
for _, pod := range pods {
181-
if pod.Status.PodIP == "" {
182-
return false, nil
183-
}
184-
}
185181
return true, nil
186182
})
187183
Expect(err).NotTo(HaveOccurred(), "check pods ready")

0 commit comments

Comments
 (0)