Skip to content

Commit 88f8e66

Browse files
committed
Implement PreferSameNode traffic distribution in kube-proxy
1 parent c850835 commit 88f8e66

File tree

8 files changed

+173
-50
lines changed

8 files changed

+173
-50
lines changed

pkg/proxy/endpoint.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ type Endpoint interface {
4646
IsTerminating() bool
4747

4848
// ZoneHints returns the zone hint for the endpoint. This is based on
49-
// endpoint.hints.forZones[0].name in the EndpointSlice API.
49+
// endpoint.hints.forZones[*].name in the EndpointSlice API.
5050
ZoneHints() sets.Set[string]
51+
// NodeHints returns the node hint for the endpoint. This is based on
52+
// endpoint.hints.forNodes[*].name in the EndpointSlice API.
53+
NodeHints() sets.Set[string]
5154
}
5255

5356
// BaseEndpointInfo contains base information that defines an endpoint.
@@ -78,6 +81,9 @@ type BaseEndpointInfo struct {
7881
// zoneHints represent the zone hints for the endpoint. This is based on
7982
// endpoint.hints.forZones[*].name in the EndpointSlice API.
8083
zoneHints sets.Set[string]
84+
// nodeHints represent the node hints for the endpoint. This is based on
85+
// endpoint.hints.forNodes[*].name in the EndpointSlice API.
86+
nodeHints sets.Set[string]
8187
}
8288

8389
var _ Endpoint = &BaseEndpointInfo{}
@@ -119,12 +125,17 @@ func (info *BaseEndpointInfo) IsTerminating() bool {
119125
return info.terminating
120126
}
121127

122-
// ZoneHints returns the zone hint for the endpoint.
128+
// ZoneHints returns the zone hints for the endpoint.
123129
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
124130
return info.zoneHints
125131
}
126132

127-
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
133+
// NodeHints returns the node hints for the endpoint.
134+
func (info *BaseEndpointInfo) NodeHints() sets.Set[string] {
135+
return info.nodeHints
136+
}
137+
138+
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints, nodeHints sets.Set[string]) *BaseEndpointInfo {
128139
return &BaseEndpointInfo{
129140
ip: ip,
130141
port: port,
@@ -134,5 +145,6 @@ func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminati
134145
serving: serving,
135146
terminating: terminating,
136147
zoneHints: zoneHints,
148+
nodeHints: nodeHints,
137149
}
138150
}

pkg/proxy/endpointslicecache.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
discovery "k8s.io/api/discovery/v1"
2626
"k8s.io/apimachinery/pkg/types"
2727
"k8s.io/apimachinery/pkg/util/sets"
28+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2829
"k8s.io/klog/v2"
30+
"k8s.io/kubernetes/pkg/features"
2931
utilnet "k8s.io/utils/net"
3032
)
3133

@@ -210,17 +212,25 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
210212
serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving
211213
terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
212214

213-
var zoneHints sets.Set[string]
214-
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
215-
zoneHints = sets.New[string]()
216-
for _, zone := range endpoint.Hints.ForZones {
217-
zoneHints.Insert(zone.Name)
215+
var zoneHints, nodeHints sets.Set[string]
216+
if endpoint.Hints != nil {
217+
if len(endpoint.Hints.ForZones) > 0 {
218+
zoneHints = sets.New[string]()
219+
for _, zone := range endpoint.Hints.ForZones {
220+
zoneHints.Insert(zone.Name)
221+
}
222+
}
223+
if len(endpoint.Hints.ForNodes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) {
224+
nodeHints = sets.New[string]()
225+
for _, node := range endpoint.Hints.ForNodes {
226+
nodeHints.Insert(node.Name)
227+
}
218228
}
219229
}
220230

221231
endpointIP := utilnet.ParseIPSloppy(endpoint.Addresses[0]).String()
222232
endpointInfo := newBaseEndpointInfo(endpointIP, portNum, isLocal,
223-
ready, serving, terminating, zoneHints)
233+
ready, serving, terminating, zoneHints, nodeHints)
224234

225235
// This logic ensures we're deduplicating potential overlapping endpoints
226236
// isLocal should not vary between matching endpoints, but if it does, we

pkg/proxy/iptables/proxier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,7 @@ func (proxier *Proxier) syncProxyRules() {
987987
// from this node, given the service's traffic policies. hasEndpoints is true
988988
// if the service has any usable endpoints on any node, not just this one.
989989
allEndpoints := proxier.endpointsMap[svcName]
990-
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
990+
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
991991

992992
// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
993993
clusterPolicyChain := svcInfo.clusterPolicyChainName

pkg/proxy/ipvs/proxier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1843,7 +1843,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
18431843
if !ok {
18441844
proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
18451845
} else {
1846-
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
1846+
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
18471847
if onlyNodeLocalEndpoints {
18481848
if len(localEndpoints) > 0 {
18491849
endpoints = localEndpoints

pkg/proxy/nftables/proxier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,7 @@ func (proxier *Proxier) syncProxyRules() {
13101310
// from this node, given the service's traffic policies. hasEndpoints is true
13111311
// if the service has any usable endpoints on any node, not just this one.
13121312
allEndpoints := proxier.endpointsMap[svcName]
1313-
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
1313+
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
13141314

13151315
// skipServiceUpdate is used for all service-related chains and their elements.
13161316
// If no changes were done to the service or its endpoints, these objects may be skipped.

pkg/proxy/topology.go

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package proxy
1818

1919
import (
2020
v1 "k8s.io/api/core/v1"
21+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2122
"k8s.io/klog/v2"
23+
"k8s.io/kubernetes/pkg/features"
2224
)
2325

2426
// CategorizeEndpoints returns:
@@ -39,16 +41,18 @@ import (
3941
// "Usable endpoints" means Ready endpoints by default, but will fall back to
4042
// Serving-Terminating endpoints (independently for Cluster and Local) if no Ready
4143
// endpoints are available.
42-
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
43-
var useTopology, useServingTerminatingEndpoints bool
44+
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
45+
var topologyMode string
46+
var useServingTerminatingEndpoints bool
4447

4548
if svcInfo.UsesClusterEndpoints() {
46-
useTopology = canUseTopology(endpoints, nodeLabels)
49+
zone := nodeLabels[v1.LabelTopologyZone]
50+
topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone)
4751
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
4852
if !ep.IsReady() {
4953
return false
5054
}
51-
if useTopology && !availableForTopology(ep, nodeLabels) {
55+
if !availableForTopology(ep, topologyMode, nodeName, zone) {
5256
return false
5357
}
5458
return true
@@ -114,9 +118,9 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
114118
return
115119
}
116120

117-
if !useTopology && !useServingTerminatingEndpoints {
121+
if topologyMode == "" && !useServingTerminatingEndpoints {
118122
// !useServingTerminatingEndpoints means that localEndpoints contains only
119-
// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
123+
// Ready endpoints. topologyMode=="" means that clusterEndpoints contains *every*
120124
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
121125
allReachableEndpoints = clusterEndpoints
122126
return
@@ -140,48 +144,77 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
140144
return
141145
}
142146

143-
// canUseTopology returns true if all of the following are true:
144-
// - The node's labels include "topology.kubernetes.io/zone".
145-
// - All of the endpoints for this Service have a topology hint.
146-
// - At least one endpoint for this Service is hinted for this node's zone.
147-
func canUseTopology(endpoints []Endpoint, nodeLabels map[string]string) bool {
148-
zone, foundZone := nodeLabels[v1.LabelTopologyZone]
147+
// topologyModeFromHints returns a topology mode ("", "PreferSameZone", or
148+
// "PreferSameNode") based on the Endpoint hints:
149+
// - If the PreferSameTrafficDistribution feature gate is enabled, and every ready
150+
// endpoint has a node hint, and at least one endpoint is hinted for this node, then
151+
// it returns "PreferSameNode".
152+
// - Otherwise, if every ready endpoint has a zone hint, and at least one endpoint is
153+
// hinted for this node's zone, then it returns "PreferSameZone".
154+
// - Otherwise it returns "" (meaning, no topology / default traffic distribution).
155+
func topologyModeFromHints(svcInfo ServicePort, endpoints []Endpoint, nodeName, zone string) string {
156+
hasEndpointForNode := false
157+
allEndpointsHaveNodeHints := true
149158
hasEndpointForZone := false
159+
allEndpointsHaveZoneHints := true
150160
for _, endpoint := range endpoints {
151161
if !endpoint.IsReady() {
152162
continue
153163
}
154164

155-
// If any of the endpoints do not have zone hints, we bail out
156-
if endpoint.ZoneHints().Len() == 0 {
157-
klog.V(7).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint)
158-
return false
159-
}
160-
161-
// If we've made it this far, we have endpoints with hints set. Now we check if there is a
162-
// zone label, if there isn't one we log a warning and bail out
163-
if !foundZone || zone == "" {
164-
klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
165-
return false
165+
if endpoint.NodeHints().Len() == 0 {
166+
allEndpointsHaveNodeHints = false
167+
} else if endpoint.NodeHints().Has(nodeName) {
168+
hasEndpointForNode = true
166169
}
167170

168-
if endpoint.ZoneHints().Has(zone) {
171+
if endpoint.ZoneHints().Len() == 0 {
172+
allEndpointsHaveZoneHints = false
173+
} else if endpoint.ZoneHints().Has(zone) {
169174
hasEndpointForZone = true
170175
}
171176
}
172177

173-
if !hasEndpointForZone {
174-
klog.V(7).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
175-
return false
178+
if utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) {
179+
if allEndpointsHaveNodeHints {
180+
if hasEndpointForNode {
181+
return v1.ServiceTrafficDistributionPreferSameNode
182+
}
183+
klog.V(2).InfoS("Ignoring same-node topology hints for service since no hints were provided for node", "service", svcInfo, "node", nodeName)
184+
} else {
185+
klog.V(7).InfoS("Ignoring same-node topology hints for service since one or more endpoints is missing a node hint", "service", svcInfo)
186+
}
176187
}
177-
return true
188+
if allEndpointsHaveZoneHints {
189+
if hasEndpointForZone {
190+
return v1.ServiceTrafficDistributionPreferSameZone
191+
}
192+
if zone == "" {
193+
klog.V(2).InfoS("Ignoring same-zone topology hints for service since node is missing label", "service", svcInfo, "label", v1.LabelTopologyZone)
194+
} else {
195+
klog.V(2).InfoS("Ignoring same-zone topology hints for service since no hints were provided for zone", "service", svcInfo, "zone", zone)
196+
}
197+
} else {
198+
klog.V(7).InfoS("Ignoring same-zone topology hints for service since one or more endpoints is missing a zone hint", "service", svcInfo.String())
199+
}
200+
201+
return ""
178202
}
179203

180-
// availableForTopology checks if this endpoint is available for use on this node, given
181-
// topology constraints. (It assumes that canUseTopology() returned true.)
182-
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
183-
zone := nodeLabels[v1.LabelTopologyZone]
184-
return endpoint.ZoneHints().Has(zone)
204+
// availableForTopology checks if this endpoint is available for use on this node when
205+
// using the given topologyMode. (Note that there's no fallback here; the fallback happens
206+
// when deciding which mode to use, not when applying that decision.)
207+
func availableForTopology(endpoint Endpoint, topologyMode, nodeName, zone string) bool {
208+
switch topologyMode {
209+
case "":
210+
return true
211+
case v1.ServiceTrafficDistributionPreferSameNode:
212+
return endpoint.NodeHints().Has(nodeName)
213+
case v1.ServiceTrafficDistributionPreferSameZone:
214+
return endpoint.ZoneHints().Has(zone)
215+
default:
216+
return false
217+
}
185218
}
186219

187220
// filterEndpoints filters endpoints according to predicate

pkg/proxy/topology_test.go

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
v1 "k8s.io/api/core/v1"
2424
kerrors "k8s.io/apimachinery/pkg/util/errors"
2525
"k8s.io/apimachinery/pkg/util/sets"
26+
utilfeature "k8s.io/apiserver/pkg/util/feature"
27+
featuregatetesting "k8s.io/component-base/featuregate/testing"
28+
"k8s.io/kubernetes/pkg/features"
2629
)
2730

2831
func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error {
@@ -44,10 +47,12 @@ func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error
4447

4548
func TestCategorizeEndpoints(t *testing.T) {
4649
testCases := []struct {
47-
name string
48-
nodeLabels map[string]string
49-
serviceInfo ServicePort
50-
endpoints []Endpoint
50+
name string
51+
preferSameEnabled bool
52+
nodeName string
53+
nodeLabels map[string]string
54+
serviceInfo ServicePort
55+
endpoints []Endpoint
5156

5257
// We distinguish `nil` ("service doesn't use this kind of endpoints") from
5358
// `sets.Set[string]()` ("service uses this kind of endpoints but has no endpoints").
@@ -172,6 +177,62 @@ func TestCategorizeEndpoints(t *testing.T) {
172177
},
173178
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
174179
localEndpoints: nil,
180+
}, {
181+
name: "PreferSameNode falls back to same-zone when feature gate disabled",
182+
preferSameEnabled: false,
183+
nodeName: "node-1",
184+
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
185+
serviceInfo: &BaseServicePortInfo{},
186+
endpoints: []Endpoint{
187+
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
188+
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
189+
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
190+
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
191+
},
192+
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
193+
localEndpoints: nil,
194+
}, {
195+
name: "PreferSameNode available",
196+
preferSameEnabled: true,
197+
nodeName: "node-1",
198+
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
199+
serviceInfo: &BaseServicePortInfo{},
200+
endpoints: []Endpoint{
201+
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
202+
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
203+
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
204+
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
205+
},
206+
clusterEndpoints: sets.New[string]("10.1.2.3:80"),
207+
localEndpoints: nil,
208+
}, {
209+
name: "PreferSameNode ignored if some endpoints unhinted",
210+
preferSameEnabled: true,
211+
nodeName: "node-1",
212+
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
213+
serviceInfo: &BaseServicePortInfo{},
214+
endpoints: []Endpoint{
215+
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
216+
&BaseEndpointInfo{endpoint: "10.1.2.4:80", ready: true},
217+
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
218+
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
219+
},
220+
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
221+
localEndpoints: nil,
222+
}, {
223+
name: "PreferSameNode falls back to PreferSameZone if no endpoint for node",
224+
preferSameEnabled: true,
225+
nodeName: "node-0",
226+
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
227+
serviceInfo: &BaseServicePortInfo{},
228+
endpoints: []Endpoint{
229+
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
230+
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
231+
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
232+
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
233+
},
234+
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
235+
localEndpoints: nil,
175236
}, {
176237
name: "conflicting topology and localness require merging allEndpoints",
177238
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
@@ -333,7 +394,9 @@ func TestCategorizeEndpoints(t *testing.T) {
333394

334395
for _, tc := range testCases {
335396
t.Run(tc.name, func(t *testing.T) {
336-
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
397+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferSameTrafficDistribution, tc.preferSameEnabled)
398+
399+
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeName, tc.nodeLabels)
337400

338401
if tc.clusterEndpoints == nil && clusterEndpoints != nil {
339402
t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints)

pkg/proxy/winkernel/proxier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ func (info *endpointInfo) ZoneHints() sets.Set[string] {
358358
return sets.Set[string]{}
359359
}
360360

361+
// NodeHints returns the node hints for the endpoint.
362+
func (info *endpointInfo) NodeHints() sets.Set[string] {
363+
return sets.Set[string]{}
364+
}
365+
361366
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
362367
func (info *endpointInfo) IP() string {
363368
return info.ip

0 commit comments

Comments
 (0)