Skip to content

Commit 5253a3f

Browse files
committed
Equip getEndpointsFromSlices to select Serving Endpoints
This wires up `getEndpointsFromSlices` to have different selection modes for querying Endpoint objects. This is necessary so that `persistent-drainable` can eventually select Endpoints that are `Serving` instead of only `Ready`. This added a unit test to demonstrate the selection behavior, and the wiring up in the controller remains to be done. The Endpoint type was also updated to have an `IsDraining` field when the Endpoint is not Ready.
1 parent 32b125a commit 5253a3f

File tree

5 files changed

+158
-13
lines changed

5 files changed

+158
-13
lines changed

internal/ingress/controller/controller.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
536536
sp := svc.Spec.Ports[i]
537537
if sp.Name == svcPort {
538538
if sp.Protocol == proto {
539-
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
539+
endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
540540
break
541541
}
542542
}
@@ -548,7 +548,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
548548
//nolint:gosec // Ignore G109 error
549549
if sp.Port == int32(targetPort) {
550550
if sp.Protocol == proto {
551-
endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices)
551+
endps = getEndpointsFromSlices(svc, &sp, proto, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
552552
break
553553
}
554554
}
@@ -605,7 +605,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
605605
} else {
606606
zone = emptyZone
607607
}
608-
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
608+
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
609609
if len(endps) == 0 {
610610
klog.Warningf("Service %q does not have any active Endpoint", svcKey)
611611
endps = []ingress.Endpoint{n.DefaultEndpoint()}
@@ -940,7 +940,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in
940940
} else {
941941
zone = emptyZone
942942
}
943-
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
943+
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, ReadyEndpoints, n.store.GetServiceEndpointsSlices)
944944
// custom backend is valid only if contains at least one endpoint
945945
if len(endps) > 0 {
946946
name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName())
@@ -1050,7 +1050,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
10501050

10511051
if len(upstreams[defBackend].Endpoints) == 0 {
10521052
_, port := upstreamServiceNameAndPort(ing.Spec.DefaultBackend.Service)
1053-
endps, err := n.serviceEndpoints(svcKey, port.String())
1053+
endps, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints)
10541054
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
10551055
if err != nil {
10561056
klog.Warningf("Error creating upstream %q: %v", defBackend, err)
@@ -1115,7 +1115,8 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
11151115

11161116
if len(upstreams[name].Endpoints) == 0 {
11171117
_, port := upstreamServiceNameAndPort(path.Backend.Service)
1118-
endp, err := n.serviceEndpoints(svcKey, port.String())
1118+
endp, err := n.serviceEndpoints(svcKey, port.String(), ReadyEndpoints)
1119+
11191120
if err != nil {
11201121
klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
11211122
n.metricCollector.IncOrphanIngress(ing.Namespace, ing.Name, orphanMetricLabelNoService)
@@ -1184,7 +1185,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *netw
11841185
}
11851186

11861187
// serviceEndpoints returns the upstream servers (Endpoints) associated with a Service.
1187-
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingress.Endpoint, error) {
1188+
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, epSelectionMode EndpointSelectionMode) ([]ingress.Endpoint, error) {
11881189
var upstreams []ingress.Endpoint
11891190

11901191
svc, err := n.store.GetService(svcKey)
@@ -1205,7 +1206,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
12051206
return upstreams, nil
12061207
}
12071208
servicePort := externalNamePorts(backendPort, svc)
1208-
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
1209+
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices)
12091210
if len(endps) == 0 {
12101211
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
12111212
return upstreams, nil
@@ -1221,7 +1222,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
12211222
if strconv.Itoa(int(servicePort.Port)) == backendPort ||
12221223
servicePort.TargetPort.String() == backendPort ||
12231224
servicePort.Name == backendPort {
1224-
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices)
1225+
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, epSelectionMode, n.store.GetServiceEndpointsSlices)
12251226
if len(endps) == 0 {
12261227
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
12271228
}

internal/ingress/controller/endpointslices.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@ import (
3434
"k8s.io/ingress-nginx/pkg/apis/ingress"
3535
)
3636

37+
type EndpointSelectionMode int
38+
const (
39+
ReadyEndpoints EndpointSelectionMode = iota
40+
ServingEndpoints
41+
)
42+
3743
// getEndpointsFromSlices returns a list of Endpoint structs for a given service/target port combination.
3844
func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, zoneForHints string,
39-
getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error),
45+
epSelectionMode EndpointSelectionMode, getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error),
4046
) []ingress.Endpoint {
4147
upsServers := []ingress.Endpoint{}
4248

@@ -153,9 +159,19 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
153159
}
154160

155161
for _, ep := range eps.Endpoints {
156-
if (ep.Conditions.Ready != nil) && !(*ep.Conditions.Ready) {
157-
continue
162+
epIsReady := (ep.Conditions.Ready == nil) || *ep.Conditions.Ready
163+
if epSelectionMode == ReadyEndpoints {
164+
if !epIsReady {
165+
continue
166+
}
167+
} else {
168+
// assume epSelectionMode == ServingEndpoints.
169+
epIsServing := (ep.Conditions.Serving == nil) || *ep.Conditions.Serving
170+
if !epIsServing {
171+
continue
172+
}
158173
}
174+
159175
epHasZone := false
160176
if useTopologyHints {
161177
for _, epzone := range ep.Hints.ForZones {
@@ -176,10 +192,12 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
176192
if _, exists := processedUpstreamServers[hostPort]; exists {
177193
continue
178194
}
195+
179196
ups := ingress.Endpoint{
180197
Address: epAddress,
181198
Port: fmt.Sprintf("%v", epPort),
182199
Target: ep.TargetRef,
200+
IsDraining: !epIsReady,
183201
}
184202
upsServers = append(upsServers, ups)
185203
processedUpstreamServers[hostPort] = struct{}{}

0 commit comments

Comments
 (0)