Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- endpoints
- namespaces
- pods
- secrets
Expand All @@ -22,6 +16,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- apisix.apache.org
resources:
Expand Down
118 changes: 91 additions & 27 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,29 @@ type ApisixRouteReconciler struct {
Provider provider.Provider
Updater status.Updater
Readier readiness.ReadinessManager

// supportsEndpointSlice indicates whether the cluster supports EndpointSlice API
supportsEndpointSlice bool
}

// SetupWithManager sets up the controller with the Manager.
func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Check and store EndpointSlice API support
r.supportsEndpointSlice = pkgutils.HasAPIResource(mgr, &discoveryv1.EndpointSlice{})

eventFilters := []predicate.Predicate{
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
}

if !r.supportsEndpointSlice {
eventFilters = append(eventFilters, predicate.NewPredicateFuncs(TypePredicate[*corev1.Endpoints]()))
}

bdr := ctrl.NewControllerManagedBy(mgr).
For(&apiv2.ApisixRoute{}).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
WithEventFilter(predicate.Or(eventFilters...)).
Watches(
&networkingv1.IngressClass{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForIngressClass),
Expand All @@ -81,10 +91,15 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
).
Watches(&v1alpha1.GatewayProxy{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForGatewayProxy),
).
Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForService),
).
)

// Conditionally watch EndpointSlice or Endpoints based on cluster API support
bdr = watchEndpointSliceOrEndpoints(bdr, r.supportsEndpointSlice,
r.listApisixRoutesForService,
r.listApisixRoutesForEndpoints,
r.Log)

return bdr.
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForSecret),
).
Expand Down Expand Up @@ -341,23 +356,46 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
}
tc.Services[serviceNN] = &service

var endpoints discoveryv1.EndpointSliceList
if err := r.List(ctx, &endpoints,
client.InNamespace(service.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
// Conditionally collect EndpointSlice or Endpoints based on cluster API support
if r.supportsEndpointSlice {
var endpoints discoveryv1.EndpointSliceList
if err := r.List(ctx, &endpoints,
client.InNamespace(service.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
}
}
}

// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
} else {
// Fallback to Endpoints API for Kubernetes 1.18 compatibility
var ep corev1.Endpoints
if err := r.Get(ctx, serviceNN, &ep); err != nil {
if client.IgnoreNotFound(err) != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get endpoints: %v", err),
}
}
// If endpoints not found, create empty EndpointSlice list
tc.EndpointSlices[serviceNN] = []discoveryv1.EndpointSlice{}
} else {
// Convert Endpoints to EndpointSlice format for internal consistency
convertedEndpointSlices := pkgutils.ConvertEndpointsToEndpointSlice(&ep)

// Apply subset filtering to converted EndpointSlices
subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, convertedEndpointSlices, subsetLabels)
}
}
}

return nil
Expand Down Expand Up @@ -443,6 +481,32 @@ func (r *ApisixRouteReconciler) listApisixRoutesForService(ctx context.Context,
return pkgutils.DedupComparable(requests)
}

// listApisixRoutesForEndpoints handles Endpoints objects and converts them to ApisixRoute reconcile requests.
// This function provides backward compatibility for Kubernetes 1.18 clusters that don't support EndpointSlice.
func (r *ApisixRouteReconciler) listApisixRoutesForEndpoints(ctx context.Context, obj client.Object) []reconcile.Request {
endpoint, ok := obj.(*corev1.Endpoints)
if !ok {
return nil
}

var (
namespace = endpoint.GetNamespace()
serviceName = endpoint.GetName() // For Endpoints, the name is the service name
arList apiv2.ApisixRouteList
)
if err := r.List(ctx, &arList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by service", "service", serviceName)
return nil
}
requests := make([]reconcile.Request, 0, len(arList.Items))
for _, ar := range arList.Items {
requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
return pkgutils.DedupComparable(requests)
}

func (r *ApisixRouteReconciler) listApisixRoutesForSecret(ctx context.Context, obj client.Object) []reconcile.Request {
secret, ok := obj.(*corev1.Secret)
if !ok {
Expand Down
56 changes: 43 additions & 13 deletions internal/controller/gatewayproxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)

// GatewayProxyController reconciles a GatewayProxy object.
Expand All @@ -47,23 +48,38 @@ type GatewayProxyController struct {
Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider

// supportsEndpointSlice indicates whether the cluster supports EndpointSlice API
supportsEndpointSlice bool
}

func (r *GatewayProxyController) SetupWithManager(mrg ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mrg).
// Check and store EndpointSlice API support
r.supportsEndpointSlice = pkgutils.HasAPIResource(mrg, &discoveryv1.EndpointSlice{})

eventFilters := []predicate.Predicate{
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
}

if !r.supportsEndpointSlice {
eventFilters = append(eventFilters, predicate.NewPredicateFuncs(TypePredicate[*corev1.Endpoints]()))
}

bdr := ctrl.NewControllerManagedBy(mrg).
For(&v1alpha1.GatewayProxy{}).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
WithEventFilter(predicate.Or(eventFilters...)).
Watches(&corev1.Service{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderService),
).
Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpointSlice),
).
)

// Conditionally watch EndpointSlice or Endpoints based on cluster API support
bdr = watchEndpointSliceOrEndpoints(bdr, r.supportsEndpointSlice,
r.listGatewayProxiesForProviderEndpointSlice,
r.listGatewayProxiesForProviderEndpoints,
r.Log)

return bdr.
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForSecret),
).
Expand Down Expand Up @@ -93,10 +109,10 @@ func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request
if providerService == nil {
tctx.EndpointSlices[req.NamespacedName] = nil
} else {
if err := addProviderEndpointsToTranslateContext(tctx, r.Client, types.NamespacedName{
if err := addProviderEndpointsToTranslateContextWithEndpointSliceSupport(tctx, r.Client, types.NamespacedName{
Namespace: gp.Namespace,
Name: providerService.Name,
}); err != nil {
}, r.supportsEndpointSlice); err != nil {
return reconcile.Result{}, err
}
}
Expand Down Expand Up @@ -174,6 +190,20 @@ func (r *GatewayProxyController) listGatewayProxiesForProviderEndpointSlice(ctx
})
}

// listGatewayProxiesForProviderEndpoints handles Endpoints objects and converts them to GatewayProxy reconcile requests.
// This function provides backward compatibility for Kubernetes 1.18 clusters that don't support EndpointSlice.
func (r *GatewayProxyController) listGatewayProxiesForProviderEndpoints(ctx context.Context, obj client.Object) (requests []reconcile.Request) {
endpoint, ok := obj.(*corev1.Endpoints)
if !ok {
r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Endpoints")
return nil
}

return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(endpoint.GetNamespace(), endpoint.GetName()),
})
}

func (r *GatewayProxyController) listGatewayProxiesForSecret(ctx context.Context, object client.Object) []reconcile.Request {
secret, ok := object.(*corev1.Secret)
if !ok {
Expand Down
Loading
Loading