Skip to content

Commit 79d1c07

Browse files
committed
Make change trackers just ignore the "wrong" IP family
Dual-stack clusters exist; ServiceChangeTracker does not need to log messages (even at V(4)) when it sees dual-stack Services, and EndpointsChangeTracker does not need to emit Events(!) when it sees EndpointSlices of the wrong AddressType. (Though in most cases the EndpointsChangeTracker Events would not get emitted anyway, since the MetaProxier would ensure that only the v4 tracker saw v4 slices, and only the v6 tracker saw v6 slices.) Also remove a nil check labeled "This should never happen" which, in fact, we know *didn't* happen, since the function has already dereferenced the value before it checking it against nil.
1 parent 2c348bf commit 79d1c07

File tree

5 files changed

+36
-92
lines changed

5 files changed

+36
-92
lines changed

pkg/proxy/endpointschangetracker.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ import (
2929
"k8s.io/kubernetes/pkg/proxy/metrics"
3030
)
3131

32-
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
33-
discovery.AddressTypeIPv4,
34-
discovery.AddressTypeIPv6,
35-
)
36-
3732
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
3833
// Endpoints, keyed by their namespace and name.
3934
type EndpointsChangeTracker struct {
@@ -45,6 +40,9 @@ type EndpointsChangeTracker struct {
4540
// any Proxier-specific cleanup.
4641
processEndpointsMapChange processEndpointsMapChangeFunc
4742

43+
// addressType is the type of EndpointSlice this proxy tracks
44+
addressType discovery.AddressType
45+
4846
// endpointSliceCache holds a simplified version of endpoint slices.
4947
endpointSliceCache *EndpointSliceCache
5048

@@ -62,12 +60,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName)
6260
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
6361

6462
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
65-
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
63+
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
64+
addressType := discovery.AddressTypeIPv4
65+
if ipFamily == v1.IPv6Protocol {
66+
addressType = discovery.AddressTypeIPv6
67+
}
68+
6669
return &EndpointsChangeTracker{
70+
addressType: addressType,
6771
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
6872
trackerStartTime: time.Now(),
6973
processEndpointsMapChange: processEndpointsMapChange,
70-
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
74+
endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo),
7175
}
7276
}
7377

@@ -76,14 +80,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun
7680
// change that needs to be synced; note that this is different from the return value of
7781
// ServiceChangeTracker.Update().
7882
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
79-
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
80-
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
81-
return false
82-
}
83-
84-
// This should never happen
85-
if endpointSlice == nil {
86-
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
83+
if endpointSlice.AddressType != ect.addressType {
84+
klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType)
8785
return false
8886
}
8987

pkg/proxy/endpointslicecache.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@ import (
2222
"sort"
2323
"sync"
2424

25-
v1 "k8s.io/api/core/v1"
2625
discovery "k8s.io/api/discovery/v1"
2726
"k8s.io/apimachinery/pkg/types"
2827
"k8s.io/apimachinery/pkg/util/sets"
2928
utilfeature "k8s.io/apiserver/pkg/util/feature"
30-
"k8s.io/client-go/tools/events"
3129
"k8s.io/klog/v2"
3230
"k8s.io/kubernetes/pkg/features"
33-
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
3431
utilnet "k8s.io/utils/net"
3532
)
3633

@@ -49,8 +46,6 @@ type EndpointSliceCache struct {
4946

5047
makeEndpointInfo makeEndpointFunc
5148
hostname string
52-
ipFamily v1.IPFamily
53-
recorder events.EventRecorder
5449
}
5550

5651
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
@@ -72,16 +67,14 @@ type endpointSliceData struct {
7267
}
7368

7469
// NewEndpointSliceCache initializes an EndpointSliceCache.
75-
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
70+
func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
7671
if makeEndpointInfo == nil {
7772
makeEndpointInfo = standardEndpointInfo
7873
}
7974
return &EndpointSliceCache{
8075
trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
8176
hostname: hostname,
82-
ipFamily: ipFamily,
8377
makeEndpointInfo: makeEndpointInfo,
84-
recorder: recorder,
8578
}
8679
}
8780

@@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
213206
continue
214207
}
215208

216-
// Filter out the incorrect IP version case. Any endpoint port that
217-
// contains incorrect IP version will be ignored.
218-
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
219-
// Emit event on the corresponding service which had a different IP
220-
// version than the endpoint.
221-
proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
222-
continue
223-
}
224-
225209
isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
226210

227211
ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready

pkg/proxy/endpointslicecache_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) {
205205

206206
for name, tc := range testCases {
207207
t.Run(name, func(t *testing.T) {
208-
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
208+
esCache := NewEndpointSliceCache(tc.hostname, nil)
209209

210210
cmc := newCacheMutationCheck(tc.endpointSlices)
211211
for _, endpointSlice := range tc.endpointSlices {
@@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
315315

316316
for name, tc := range testCases {
317317
t.Run(name, func(t *testing.T) {
318-
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
318+
esCache := NewEndpointSliceCache(tc.hostname, nil)
319319

320320
for _, endpointSlice := range tc.endpointSlices {
321321
esCache.updatePending(endpointSlice, false)
@@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) {
350350
expectChanged bool
351351
}{
352352
"identical slices, ports only": {
353-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
353+
cache: NewEndpointSliceCache("", nil),
354354
initialSlice: &discovery.EndpointSlice{
355355
ObjectMeta: objMeta,
356356
Ports: []discovery.EndpointPort{port80},
@@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) {
362362
expectChanged: false,
363363
},
364364
"identical slices, ports out of order": {
365-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
365+
cache: NewEndpointSliceCache("", nil),
366366
initialSlice: &discovery.EndpointSlice{
367367
ObjectMeta: objMeta,
368368
Ports: []discovery.EndpointPort{port443, port80},
@@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) {
374374
expectChanged: true,
375375
},
376376
"port removed": {
377-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
377+
cache: NewEndpointSliceCache("", nil),
378378
initialSlice: &discovery.EndpointSlice{
379379
ObjectMeta: objMeta,
380380
Ports: []discovery.EndpointPort{port443, port80},
@@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) {
386386
expectChanged: true,
387387
},
388388
"port added": {
389-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
389+
cache: NewEndpointSliceCache("", nil),
390390
initialSlice: &discovery.EndpointSlice{
391391
ObjectMeta: objMeta,
392392
Ports: []discovery.EndpointPort{port443},
@@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) {
398398
expectChanged: true,
399399
},
400400
"identical with endpoints": {
401-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
401+
cache: NewEndpointSliceCache("", nil),
402402
initialSlice: &discovery.EndpointSlice{
403403
ObjectMeta: objMeta,
404404
Ports: []discovery.EndpointPort{port443},
@@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) {
412412
expectChanged: false,
413413
},
414414
"identical with endpoints out of order": {
415-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
415+
cache: NewEndpointSliceCache("", nil),
416416
initialSlice: &discovery.EndpointSlice{
417417
ObjectMeta: objMeta,
418418
Ports: []discovery.EndpointPort{port443},
@@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) {
426426
expectChanged: true,
427427
},
428428
"identical with endpoint added": {
429-
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
429+
cache: NewEndpointSliceCache("", nil),
430430
initialSlice: &discovery.EndpointSlice{
431431
ObjectMeta: objMeta,
432432
Ports: []discovery.EndpointPort{port443},

pkg/proxy/serviceport.go

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package proxy
1919
import (
2020
"fmt"
2121
"net"
22+
"strings"
2223

2324
v1 "k8s.io/api/core/v1"
2425
"k8s.io/klog/v2"
@@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv
207208
info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode]
208209
}
209210

210-
// filter external ips, source ranges and ingress ips
211-
// prior to dual stack services, this was considered an error, but with dual stack
212-
// services, this is actually expected. Hence we downgraded from reporting by events
213-
// to just log lines with high verbosity
211+
// Filter ExternalIPs to correct IP family
214212
ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs)
215213
info.externalIPs = ipFamilyMap[ipFamily]
216214

217-
// Log the IPs not matching the ipFamily
218-
if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
219-
klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
220-
"ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service))
215+
// Filter source ranges to correct IP family. Also deal with the fact that
216+
// LoadBalancerSourceRanges validation mistakenly allows whitespace padding
217+
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
218+
for i, sourceRange := range service.Spec.LoadBalancerSourceRanges {
219+
loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange)
221220
}
222221

223-
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges)
222+
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges)
224223
info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily]
225-
// Log the CIDRs not matching the ipFamily
226-
if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 {
227-
klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
228-
"ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service))
229-
}
230224

231-
// Obtain Load Balancer Ingress
232-
var invalidIPs []net.IP
225+
// Filter Load Balancer Ingress IPs to correct IP family. While proxying load
226+
// balancers might choose to proxy connections from an LB IP of one family to a
227+
// service IP of another family, that's irrelevant to kube-proxy, which only
228+
// creates rules for VIP-style load balancers.
233229
for _, ing := range service.Status.LoadBalancer.Ingress {
234-
if ing.IP == "" {
235-
continue
236-
}
237-
238-
// proxy mode load balancers do not need to track the IPs in the service cache
239-
// and they can also implement IP family translation, so no need to check if
240-
// the status ingress.IP and the ClusterIP belong to the same family.
241-
if !proxyutil.IsVIPMode(ing) {
242-
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode",
243-
"ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service))
230+
if ing.IP == "" || !proxyutil.IsVIPMode(ing) {
244231
continue
245232
}
246233

247-
// kube-proxy does not implement IP family translation, skip addresses with
248-
// different IP family
249234
ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address)
250235
if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily {
251236
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip)
252-
} else {
253-
invalidIPs = append(invalidIPs, ip)
254237
}
255238
}
256-
if len(invalidIPs) > 0 {
257-
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
258-
"ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service))
259-
}
260239

261240
if apiservice.NeedsHealthCheck(service) {
262241
p := service.Spec.HealthCheckNodePort

pkg/proxy/util/utils.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ import (
2222
"strings"
2323

2424
v1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/types"
2625
"k8s.io/apimachinery/pkg/util/sets"
2726
utilfeature "k8s.io/apiserver/pkg/util/feature"
28-
"k8s.io/client-go/tools/events"
2927
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
3028
"k8s.io/klog/v2"
3129
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string]
8684
return ips
8785
}
8886

89-
// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event.
90-
func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) {
91-
errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName)
92-
klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue)
93-
if recorder != nil {
94-
recorder.Eventf(
95-
&v1.ObjectReference{
96-
Kind: "Service",
97-
Name: svcName,
98-
Namespace: svcNamespace,
99-
UID: svcUID,
100-
}, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg)
101-
}
102-
}
103-
10487
// MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6)
10588
func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP {
10689
ipFamilyMap := map[v1.IPFamily][]net.IP{}

0 commit comments

Comments
 (0)