Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- endpoints
- namespaces
- pods
- secrets
Expand All @@ -22,6 +16,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- apisix.apache.org
resources:
Expand Down
105 changes: 86 additions & 19 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,23 @@ type ApisixRouteReconciler struct {
Provider provider.Provider
Updater status.Updater
Readier readiness.ReadinessManager

// supportsEndpointSlice indicates whether the cluster supports EndpointSlice API
supportsEndpointSlice bool
}

// SetupWithManager sets up the controller with the Manager.
func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Check and store EndpointSlice API support
r.supportsEndpointSlice = pkgutils.HasAPIResource(mgr, &discoveryv1.EndpointSlice{})

bdr := ctrl.NewControllerManagedBy(mgr).
For(&apiv2.ApisixRoute{}).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Endpoints]()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When supportsEndpointSlice is true, it is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
Expand All @@ -81,10 +88,21 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
).
Watches(&v1alpha1.GatewayProxy{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForGatewayProxy),
).
Watches(&discoveryv1.EndpointSlice{},
)

// Conditionally watch EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
bdr = bdr.Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForService),
).
)
} else {
r.Log.Info("EndpointSlice API not available, falling back to Endpoints API for service discovery")
bdr = bdr.Watches(&corev1.Endpoints{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForEndpoints),
)
}

return bdr.
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForSecret),
).
Expand Down Expand Up @@ -341,23 +359,46 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
}
tc.Services[serviceNN] = &service

var endpoints discoveryv1.EndpointSliceList
if err := r.List(ctx, &endpoints,
client.InNamespace(service.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
var endpoints discoveryv1.EndpointSliceList
if err := r.List(ctx, &endpoints,
client.InNamespace(service.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
}
}
}

// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
} else {
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
var ep corev1.Endpoints
if err := r.Get(ctx, serviceNN, &ep); err != nil {
if client.IgnoreNotFound(err) != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get endpoints: %v", err),
}
}
// If endpoints not found, create empty EndpointSlice list
tc.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
} else {
// Convert Endpoints to EndpointSlice format for internal consistency
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)

// Apply subset filtering to converted EndpointSlices
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, convertedEndpointSlices, subsetLabels)
}
}
}

return nil
Expand Down Expand Up @@ -443,6 +484,32 @@ func (r *ApisixRouteReconciler) listApisixRoutesForService(ctx context.Context,
return pkgutils.DedupComparable(requests)
}

// listApisixRoutesForEndpoints handles Endpoints objects and converts them to ApisixRoute reconcile requests.
// This function provides backward compatibility for Kubernetes 1.18 clusters that don't support EndpointSlice.
func (r *ApisixRouteReconciler) listApisixRoutesForEndpoints(ctx context.Context, obj client.Object) []reconcile.Request {
endpoint, ok := obj.(*corev1.Endpoints)
if !ok {
return nil
}

var (
namespace = endpoint.GetNamespace()
serviceName = endpoint.GetName() // For Endpoints, the name is the service name
arList apiv2.ApisixRouteList
)
if err := r.List(ctx, &arList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by service", "service", serviceName)
return nil
}
requests := make([]reconcile.Request, 0, len(arList.Items))
for _, ar := range arList.Items {
requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
return pkgutils.DedupComparable(requests)
}

func (r *ApisixRouteReconciler) listApisixRoutesForSecret(ctx context.Context, obj client.Object) []reconcile.Request {
secret, ok := obj.(*corev1.Secret)
if !ok {
Expand Down
45 changes: 39 additions & 6 deletions internal/controller/gatewayproxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)

// GatewayProxyController reconciles a GatewayProxy object.
Expand All @@ -47,23 +48,41 @@ type GatewayProxyController struct {
Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider

// supportsEndpointSlice indicates whether the cluster supports EndpointSlice API
supportsEndpointSlice bool
}

func (r *GatewayProxyController) SetupWithManager(mrg ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mrg).
// Check and store EndpointSlice API support
r.supportsEndpointSlice = pkgutils.HasAPIResource(mrg, &discoveryv1.EndpointSlice{})

bdr := ctrl.NewControllerManagedBy(mrg).
For(&v1alpha1.GatewayProxy{}).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Endpoints]()),
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
Watches(&corev1.Service{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderService),
).
Watches(&discoveryv1.EndpointSlice{},
)

// Conditionally watch EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
bdr = bdr.Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpointSlice),
).
)
} else {
r.Log.Info("EndpointSlice API not available, falling back to Endpoints API for provider service discovery")
bdr = bdr.Watches(&corev1.Endpoints{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpoints),
)
}

return bdr.
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForSecret),
).
Expand Down Expand Up @@ -93,10 +112,10 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request
if providerService == nil {
tctx.EndpointSlices[req.NamespacedName] = nil
} else {
if err := addProviderEndpointsToTranslateContext(tctx, r.Client, types.NamespacedName{
if err := addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, r.Client, types.NamespacedName{
Namespace: gp.Namespace,
Name: providerService.Name,
}); err != nil {
}, r.supportsEndpointSlice); err != nil {
return reconcile.Result{}, err
}
}
Expand Down Expand Up @@ -174,6 +193,20 @@ func (r *GatewayProxyController) listGatewayProxiesForProviderEndpointSlice(ctx
})
}

// listGatewayProxiesForProviderEndpoints handles Endpoints objects and converts them to GatewayProxy reconcile requests.
// This function provides backward compatibility for Kubernetes 1.18 clusters that don't support EndpointSlice.
func (r *GatewayProxyController) listGatewayProxiesForProviderEndpoints(ctx context.Context, obj client.Object) (requests []reconcile.Request) {
endpoint, ok := obj.(*corev1.Endpoints)
if !ok {
r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Endpoints")
return nil
}

return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(endpoint.GetNamespace(), endpoint.GetName()),
})
}

func (r *GatewayProxyController) listGatewayProxiesForSecret(ctx context.Context, object client.Object) []reconcile.Request {
secret, ok := object.(*corev1.Secret)
if !ok {
Expand Down
100 changes: 85 additions & 15 deletions internal/controller/httproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)

// HTTPRouteReconciler reconciles a GatewayClass object.
Expand All @@ -67,18 +68,39 @@ type HTTPRouteReconciler struct { //nolint:revive

Updater status.Updater
Readier readiness.ReadinessManager

// supportsEndpointSlice indicates whether the cluster supports EndpointSlice API
supportsEndpointSlice bool
}

// SetupWithManager sets up the controller with the Manager.
func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.genericEvent = make(chan event.GenericEvent, 100)

// Check and store EndpointSlice API support
r.supportsEndpointSlice = pkgutils.HasAPIResource(mgr, &discoveryv1.EndpointSlice{})

bdr := ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1.HTTPRoute{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Watches(&discoveryv1.EndpointSlice{},
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Endpoints]()),
))

// Conditionally watch EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
bdr = bdr.Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesByServiceBef),
).
)
} else {
r.Log.Info("EndpointSlice API not available, falling back to Endpoints API for service discovery")
bdr = bdr.Watches(&corev1.Endpoints{},
handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesByServiceForEndpoints),
)
}

bdr = bdr.
Watches(&v1alpha1.PluginConfig{},
handler.EnqueueRequestsFromMapFunc(r.listHTTPRoutesByExtensionRef),
).
Expand Down Expand Up @@ -288,6 +310,36 @@ func (r *HTTPRouteReconciler) listHTTPRoutesByServiceBef(ctx context.Context, ob
return requests
}

// listHTTPRoutesByServiceForEndpoints handles Endpoints objects and converts them to HTTPRoute reconcile requests.
// This function provides backward compatibility for Kubernetes 1.18 clusters that don't support EndpointSlice.
func (r *HTTPRouteReconciler) listHTTPRoutesByServiceForEndpoints(ctx context.Context, obj client.Object) []reconcile.Request {
endpoint, ok := obj.(*corev1.Endpoints)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Endpoints")
return nil
}
namespace := endpoint.GetNamespace()
serviceName := endpoint.GetName() // For Endpoints, the name is the service name

hrList := &gatewayv1.HTTPRouteList{}
if err := r.List(ctx, hrList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName),
}); err != nil {
r.Log.Error(err, "failed to list httproutes by service", "service", serviceName)
return nil
}
requests := make([]reconcile.Request, 0, len(hrList.Items))
for _, hr := range hrList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: hr.Namespace,
Name: hr.Name,
},
})
}
return requests
}

func (r *HTTPRouteReconciler) listHTTPRoutesByExtensionRef(ctx context.Context, obj client.Object) []reconcile.Request {
pluginconfig, ok := obj.(*v1alpha1.PluginConfig)
if !ok {
Expand Down Expand Up @@ -520,19 +572,37 @@ func (r *HTTPRouteReconciler) processHTTPRouteBackendRefs(tctx *provider.Transla
}
tctx.Services[targetNN] = &service

endpointSliceList := new(discoveryv1.EndpointSliceList)
if err := r.List(tctx, endpointSliceList,
client.InNamespace(targetNN.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: targetNN.Name,
},
); err != nil {
r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN)
terr = err
continue
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic be abstracted into a function for reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

endpointSliceList := new(discoveryv1.EndpointSliceList)
if err := r.List(tctx, endpointSliceList,
client.InNamespace(targetNN.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: targetNN.Name,
},
); err != nil {
r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN)
terr = err
continue
}
tctx.EndpointSlices[targetNN] = endpointSliceList.Items
} else {
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
var endpoints corev1.Endpoints
if err := r.Get(tctx, targetNN, &endpoints); err != nil {
if client.IgnoreNotFound(err) != nil {
r.Log.Error(err, "failed to get endpoints", "Service", targetNN)
terr = err
continue
}
// If endpoints not found, create empty EndpointSlice list
tctx.EndpointSlices[targetNN] = []discoveryv1.EndpointSlice{}
} else {
// Convert Endpoints to EndpointSlice format for internal consistency
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&endpoints)
tctx.EndpointSlices[targetNN] = convertedEndpointSlices
}
}

tctx.EndpointSlices[targetNN] = endpointSliceList.Items
}
return terr
}
Expand Down
Loading
Loading