Skip to content

Commit 659009a

Browse files
committed
fix: remove stale endpoints
In the KubeAccess controller, we never cleaned up stale endpoints. This resulted in stale IPs being preserved indefinitely in the resulting object. Fixes #12802 Signed-off-by: Mateusz Urbanek <mateusz.urbanek@siderolabs.com>
1 parent dab0d47 commit 659009a

File tree

2 files changed

+315
-25
lines changed

2 files changed

+315
-25
lines changed

internal/app/machined/pkg/controllers/kubeaccess/endpoint.go

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -298,31 +298,7 @@ func (ctrl *EndpointController) ensureTalosEndpointSlicesTyped(
298298
newEndpointSlice = oldEndpointSlice.DeepCopy()
299299
}
300300

301-
newEndpointSlice.Ports = []discoveryv1.EndpointPort{
302-
{
303-
Name: ptr.To("apid"),
304-
Port: ptr.To[int32](constants.ApidPort),
305-
Protocol: ptr.To(corev1.ProtocolTCP),
306-
},
307-
}
308-
309-
for _, addr := range endpointAddrs.Addresses {
310-
newEndpointSlice.Endpoints = append(
311-
newEndpointSlice.Endpoints,
312-
discoveryv1.Endpoint{
313-
Addresses: []string{addr.String()},
314-
Conditions: discoveryv1.EndpointConditions{
315-
Ready: ptr.To(true),
316-
Serving: ptr.To(true),
317-
Terminating: ptr.To(false),
318-
},
319-
},
320-
)
321-
}
322-
323-
newEndpointSlice.Endpoints = xslices.Deduplicate(newEndpointSlice.Endpoints, func(e discoveryv1.Endpoint) string {
324-
return e.Addresses[0]
325-
})
301+
PopulateEndpointSlice(newEndpointSlice, endpointAddrs)
326302

327303
if oldEndpointSlice != nil &&
328304
(reflect.DeepEqual(oldEndpointSlice.Endpoints, newEndpointSlice.Endpoints) &&
@@ -350,6 +326,39 @@ func (ctrl *EndpointController) ensureTalosEndpointSlicesTyped(
350326
}
351327
}
352328

329+
// PopulateEndpointSlice populates the given EndpointSlice with ports and endpoints from the given endpoint addresses.
330+
//
331+
// The EndpointSlice's existing Endpoints and Ports fields are overwritten.
332+
func PopulateEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, endpointAddrs k8s.EndpointList) {
333+
endpointSlice.Ports = []discoveryv1.EndpointPort{
334+
{
335+
Name: ptr.To("apid"),
336+
Port: ptr.To[int32](constants.ApidPort),
337+
Protocol: ptr.To(corev1.ProtocolTCP),
338+
},
339+
}
340+
341+
endpointSlice.Endpoints = nil
342+
343+
for _, addr := range endpointAddrs.Addresses {
344+
endpointSlice.Endpoints = append(
345+
endpointSlice.Endpoints,
346+
discoveryv1.Endpoint{
347+
Addresses: []string{addr.String()},
348+
Conditions: discoveryv1.EndpointConditions{
349+
Ready: ptr.To(true),
350+
Serving: ptr.To(true),
351+
Terminating: ptr.To(false),
352+
},
353+
},
354+
)
355+
}
356+
357+
endpointSlice.Endpoints = xslices.Deduplicate(endpointSlice.Endpoints, func(e discoveryv1.Endpoint) string {
358+
return e.Addresses[0]
359+
})
360+
}
361+
353362
//nolint:gocyclo
354363
func (ctrl *EndpointController) cleanupTalosEndpoints(ctx context.Context, logger *zap.Logger, client *kubernetes.Client) error {
355364
for {
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package kubeaccess_test
6+
7+
import (
8+
"net/netip"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
corev1 "k8s.io/api/core/v1"
14+
discoveryv1 "k8s.io/api/discovery/v1"
15+
"k8s.io/utils/ptr"
16+
17+
kubeaccessctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/kubeaccess"
18+
"github.com/siderolabs/talos/pkg/machinery/constants"
19+
"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
20+
)
21+
22+
func TestPopulateEndpointSlice(t *testing.T) {
23+
t.Parallel()
24+
25+
//nolint:dupl
26+
for _, tt := range []struct {
27+
name string
28+
existingSlice *discoveryv1.EndpointSlice
29+
endpointAddrs k8s.EndpointList
30+
expectedEndpoint []discoveryv1.Endpoint
31+
}{
32+
{
33+
name: "empty endpoint slice, single address",
34+
existingSlice: &discoveryv1.EndpointSlice{},
35+
endpointAddrs: k8s.EndpointList{
36+
Addresses: []netip.Addr{
37+
netip.MustParseAddr("192.168.1.1"),
38+
},
39+
},
40+
expectedEndpoint: []discoveryv1.Endpoint{
41+
{
42+
Addresses: []string{"192.168.1.1"},
43+
Conditions: discoveryv1.EndpointConditions{
44+
Ready: ptr.To(true),
45+
Serving: ptr.To(true),
46+
Terminating: ptr.To(false),
47+
},
48+
},
49+
},
50+
},
51+
{
52+
name: "empty endpoint slice, multiple addresses",
53+
existingSlice: &discoveryv1.EndpointSlice{},
54+
endpointAddrs: k8s.EndpointList{
55+
Addresses: []netip.Addr{
56+
netip.MustParseAddr("192.168.1.1"),
57+
netip.MustParseAddr("192.168.1.2"),
58+
netip.MustParseAddr("192.168.1.3"),
59+
},
60+
},
61+
expectedEndpoint: []discoveryv1.Endpoint{
62+
{
63+
Addresses: []string{"192.168.1.1"},
64+
Conditions: discoveryv1.EndpointConditions{
65+
Ready: ptr.To(true),
66+
Serving: ptr.To(true),
67+
Terminating: ptr.To(false),
68+
},
69+
},
70+
{
71+
Addresses: []string{"192.168.1.2"},
72+
Conditions: discoveryv1.EndpointConditions{
73+
Ready: ptr.To(true),
74+
Serving: ptr.To(true),
75+
Terminating: ptr.To(false),
76+
},
77+
},
78+
{
79+
Addresses: []string{"192.168.1.3"},
80+
Conditions: discoveryv1.EndpointConditions{
81+
Ready: ptr.To(true),
82+
Serving: ptr.To(true),
83+
Terminating: ptr.To(false),
84+
},
85+
},
86+
},
87+
},
88+
{
89+
name: "stale endpoints are removed",
90+
existingSlice: &discoveryv1.EndpointSlice{
91+
Endpoints: []discoveryv1.Endpoint{
92+
{
93+
Addresses: []string{"10.0.0.1"},
94+
Conditions: discoveryv1.EndpointConditions{
95+
Ready: ptr.To(true),
96+
Serving: ptr.To(true),
97+
Terminating: ptr.To(false),
98+
},
99+
},
100+
{
101+
Addresses: []string{"10.0.0.2"},
102+
Conditions: discoveryv1.EndpointConditions{
103+
Ready: ptr.To(true),
104+
Serving: ptr.To(true),
105+
Terminating: ptr.To(false),
106+
},
107+
},
108+
{
109+
Addresses: []string{"10.0.0.3"},
110+
Conditions: discoveryv1.EndpointConditions{
111+
Ready: ptr.To(true),
112+
Serving: ptr.To(true),
113+
Terminating: ptr.To(false),
114+
},
115+
},
116+
},
117+
},
118+
endpointAddrs: k8s.EndpointList{
119+
Addresses: []netip.Addr{
120+
netip.MustParseAddr("10.0.0.1"),
121+
},
122+
},
123+
expectedEndpoint: []discoveryv1.Endpoint{
124+
{
125+
Addresses: []string{"10.0.0.1"},
126+
Conditions: discoveryv1.EndpointConditions{
127+
Ready: ptr.To(true),
128+
Serving: ptr.To(true),
129+
Terminating: ptr.To(false),
130+
},
131+
},
132+
},
133+
},
134+
{
135+
name: "all stale endpoints replaced with new ones",
136+
existingSlice: &discoveryv1.EndpointSlice{
137+
Endpoints: []discoveryv1.Endpoint{
138+
{
139+
Addresses: []string{"10.0.0.1"},
140+
Conditions: discoveryv1.EndpointConditions{
141+
Ready: ptr.To(true),
142+
Serving: ptr.To(true),
143+
Terminating: ptr.To(false),
144+
},
145+
},
146+
{
147+
Addresses: []string{"10.0.0.2"},
148+
Conditions: discoveryv1.EndpointConditions{
149+
Ready: ptr.To(true),
150+
Serving: ptr.To(true),
151+
Terminating: ptr.To(false),
152+
},
153+
},
154+
},
155+
},
156+
endpointAddrs: k8s.EndpointList{
157+
Addresses: []netip.Addr{
158+
netip.MustParseAddr("192.168.1.1"),
159+
netip.MustParseAddr("192.168.1.2"),
160+
},
161+
},
162+
expectedEndpoint: []discoveryv1.Endpoint{
163+
{
164+
Addresses: []string{"192.168.1.1"},
165+
Conditions: discoveryv1.EndpointConditions{
166+
Ready: ptr.To(true),
167+
Serving: ptr.To(true),
168+
Terminating: ptr.To(false),
169+
},
170+
},
171+
{
172+
Addresses: []string{"192.168.1.2"},
173+
Conditions: discoveryv1.EndpointConditions{
174+
Ready: ptr.To(true),
175+
Serving: ptr.To(true),
176+
Terminating: ptr.To(false),
177+
},
178+
},
179+
},
180+
},
181+
{
182+
name: "duplicate addresses are deduplicated",
183+
existingSlice: &discoveryv1.EndpointSlice{
184+
Endpoints: []discoveryv1.Endpoint{
185+
{
186+
Addresses: []string{"10.0.0.1"},
187+
Conditions: discoveryv1.EndpointConditions{
188+
Ready: ptr.To(true),
189+
Serving: ptr.To(true),
190+
Terminating: ptr.To(false),
191+
},
192+
},
193+
},
194+
},
195+
endpointAddrs: k8s.EndpointList{
196+
Addresses: []netip.Addr{
197+
netip.MustParseAddr("192.168.1.1"),
198+
netip.MustParseAddr("192.168.1.1"),
199+
netip.MustParseAddr("192.168.1.2"),
200+
},
201+
},
202+
expectedEndpoint: []discoveryv1.Endpoint{
203+
{
204+
Addresses: []string{"192.168.1.1"},
205+
Conditions: discoveryv1.EndpointConditions{
206+
Ready: ptr.To(true),
207+
Serving: ptr.To(true),
208+
Terminating: ptr.To(false),
209+
},
210+
},
211+
{
212+
Addresses: []string{"192.168.1.2"},
213+
Conditions: discoveryv1.EndpointConditions{
214+
Ready: ptr.To(true),
215+
Serving: ptr.To(true),
216+
Terminating: ptr.To(false),
217+
},
218+
},
219+
},
220+
},
221+
{
222+
name: "empty address list clears endpoints",
223+
existingSlice: &discoveryv1.EndpointSlice{
224+
Endpoints: []discoveryv1.Endpoint{
225+
{
226+
Addresses: []string{"10.0.0.1"},
227+
Conditions: discoveryv1.EndpointConditions{
228+
Ready: ptr.To(true),
229+
Serving: ptr.To(true),
230+
Terminating: ptr.To(false),
231+
},
232+
},
233+
},
234+
},
235+
endpointAddrs: k8s.EndpointList{},
236+
expectedEndpoint: nil,
237+
},
238+
{
239+
name: "IPv6 addresses",
240+
existingSlice: &discoveryv1.EndpointSlice{},
241+
endpointAddrs: k8s.EndpointList{
242+
Addresses: []netip.Addr{
243+
netip.MustParseAddr("fd00::1"),
244+
netip.MustParseAddr("fd00::2"),
245+
},
246+
},
247+
expectedEndpoint: []discoveryv1.Endpoint{
248+
{
249+
Addresses: []string{"fd00::1"},
250+
Conditions: discoveryv1.EndpointConditions{
251+
Ready: ptr.To(true),
252+
Serving: ptr.To(true),
253+
Terminating: ptr.To(false),
254+
},
255+
},
256+
{
257+
Addresses: []string{"fd00::2"},
258+
Conditions: discoveryv1.EndpointConditions{
259+
Ready: ptr.To(true),
260+
Serving: ptr.To(true),
261+
Terminating: ptr.To(false),
262+
},
263+
},
264+
},
265+
},
266+
} {
267+
t.Run(tt.name, func(t *testing.T) {
268+
t.Parallel()
269+
270+
kubeaccessctrl.PopulateEndpointSlice(tt.existingSlice, tt.endpointAddrs)
271+
272+
assert.Equal(t, tt.expectedEndpoint, tt.existingSlice.Endpoints)
273+
274+
// verify ports are always set correctly
275+
require.Len(t, tt.existingSlice.Ports, 1)
276+
assert.Equal(t, "apid", *tt.existingSlice.Ports[0].Name)
277+
assert.Equal(t, int32(constants.ApidPort), *tt.existingSlice.Ports[0].Port)
278+
assert.Equal(t, corev1.ProtocolTCP, *tt.existingSlice.Ports[0].Protocol)
279+
})
280+
}
281+
}

0 commit comments

Comments
 (0)