Skip to content

Commit 11c689b

Browse files
authored
Merge pull request kubernetes#125675 from tnqn/fix-rapid-endpoints-update
Fix endpoints status out-of-sync when the pod state changes rapidly
2 parents c9a5b6e + 3bd9758 commit 11c689b

File tree

5 files changed

+377
-13
lines changed

5 files changed

+377
-13
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInf
110110
e.endpointsLister = endpointsInformer.Lister()
111111
e.endpointsSynced = endpointsInformer.Informer().HasSynced
112112

113+
e.staleEndpointsTracker = newStaleEndpointsTracker()
113114
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
114115
e.eventBroadcaster = broadcaster
115116
e.eventRecorder = recorder
@@ -145,6 +146,8 @@ type Controller struct {
145146
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
146147
// Added as a member to the struct to allow injection for testing.
147148
endpointsSynced cache.InformerSynced
149+
// staleEndpointsTracker can help determine if a cached Endpoints is out of date.
150+
staleEndpointsTracker *staleEndpointsTracker
148151

149152
// Services that need to be updated. A channel is inappropriate here,
150153
// because it allows services with lots of pods to be serviced much
@@ -384,6 +387,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
384387
return err
385388
}
386389
e.triggerTimeTracker.DeleteService(namespace, name)
390+
e.staleEndpointsTracker.Delete(namespace, name)
387391
return nil
388392
}
389393

@@ -473,6 +477,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
473477
Labels: service.Labels,
474478
},
475479
}
480+
} else if e.staleEndpointsTracker.IsStale(currentEndpoints) {
481+
return fmt.Errorf("endpoints informer cache is out of date, resource version %s already processed for endpoints %s", currentEndpoints.ResourceVersion, key)
476482
}
477483

478484
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
@@ -555,6 +561,12 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
555561

556562
return err
557563
}
564+
// If the current endpoints is updated we track the old resource version, so
565+
// if we obtain this resource version again from the lister we know is outdated
566+
// and we need to retry later to wait for the informer cache to be up-to-date.
567+
if !createEndpoints {
568+
e.staleEndpointsTracker.Stale(currentEndpoints)
569+
}
558570
return nil
559571
}
560572

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
160160
return httptest.NewServer(mux), &fakeEndpointsHandler
161161
}
162162

163-
// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
164-
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
165-
// be sent in the response.
166-
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
163+
// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE"
164+
// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait
165+
// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response.
166+
func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
167167

168168
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
169169
if controller == nil {
@@ -172,23 +172,37 @@ func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointCont
172172
return
173173
}
174174

175-
if req.Method == "POST" {
176-
controller.endpointsStore.Add(endpoint)
175+
if req.Method == "POST" || req.Method == "PUT" {
176+
if blockUpdate != nil {
177+
go func() {
178+
// Delay the update of endpoints to make endpoints cache out of sync
179+
<-blockUpdate
180+
_ = controller.endpointsStore.Add(endpoint)
181+
}()
182+
} else {
183+
_ = controller.endpointsStore.Add(endpoint)
184+
}
177185
blockNextAction <- struct{}{}
178186
}
179187

180188
if req.Method == "DELETE" {
181-
go func() {
182-
// Delay the deletion of endoints to make endpoint cache out of sync
183-
<-blockDelete
184-
controller.endpointsStore.Delete(endpoint)
189+
if blockDelete != nil {
190+
go func() {
191+
// Delay the deletion of endpoints to make endpoints cache out of sync
192+
<-blockDelete
193+
_ = controller.endpointsStore.Delete(endpoint)
194+
controller.onEndpointsDelete(endpoint)
195+
}()
196+
} else {
197+
_ = controller.endpointsStore.Delete(endpoint)
185198
controller.onEndpointsDelete(endpoint)
186-
}()
199+
}
187200
blockNextAction <- struct{}{}
188201
}
189202

203+
res.Header().Set("Content-Type", "application/json")
190204
res.WriteHeader(http.StatusOK)
191-
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
205+
_, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint)))
192206
}
193207

194208
mux := http.NewServeMux()
@@ -2378,7 +2392,7 @@ func TestMultipleServiceChanges(t *testing.T) {
23782392
blockDelete := make(chan struct{})
23792393
blockNextAction := make(chan struct{})
23802394
stopChan := make(chan struct{})
2381-
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
2395+
testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns)
23822396
defer testServer.Close()
23832397

23842398
tCtx := ktesting.Init(t)
@@ -2423,6 +2437,83 @@ func TestMultipleServiceChanges(t *testing.T) {
24232437
close(stopChan)
24242438
}
24252439

2440+
// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are
2441+
// eventually resynced after multiple Pod changes.
2442+
func TestMultiplePodChanges(t *testing.T) {
2443+
ns := metav1.NamespaceDefault
2444+
2445+
readyEndpoints := &v1.Endpoints{
2446+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
2447+
Subsets: []v1.EndpointSubset{{
2448+
Addresses: []v1.EndpointAddress{
2449+
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2450+
},
2451+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
2452+
}},
2453+
}
2454+
notReadyEndpoints := &v1.Endpoints{
2455+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"},
2456+
Subsets: []v1.EndpointSubset{{
2457+
NotReadyAddresses: []v1.EndpointAddress{
2458+
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2459+
},
2460+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
2461+
}},
2462+
}
2463+
2464+
controller := &endpointController{}
2465+
blockUpdate := make(chan struct{})
2466+
blockNextAction := make(chan struct{})
2467+
stopChan := make(chan struct{})
2468+
testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns)
2469+
defer testServer.Close()
2470+
2471+
tCtx := ktesting.Init(t)
2472+
*controller = *newController(tCtx, testServer.URL, 0*time.Second)
2473+
pod := testPod(ns, 0, 1, true, ipv4only)
2474+
_ = controller.podStore.Add(pod)
2475+
_ = controller.endpointsStore.Add(readyEndpoints)
2476+
_ = controller.serviceStore.Add(&v1.Service{
2477+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2478+
Spec: v1.ServiceSpec{
2479+
Selector: map[string]string{"foo": "bar"},
2480+
ClusterIP: "10.0.0.1",
2481+
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
2482+
},
2483+
})
2484+
2485+
go func() { controller.Run(tCtx, 1) }()
2486+
2487+
// Rapidly update the Pod: Ready -> NotReady -> Ready.
2488+
pod2 := pod.DeepCopy()
2489+
pod2.ResourceVersion = "2"
2490+
pod2.Status.Conditions[0].Status = v1.ConditionFalse
2491+
_ = controller.podStore.Update(pod2)
2492+
controller.updatePod(pod, pod2)
2493+
// blockNextAction should eventually unblock once server gets endpoints request.
2494+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server")
2495+
// The endpoints update hasn't been applied to the cache yet.
2496+
pod3 := pod.DeepCopy()
2497+
pod3.ResourceVersion = "3"
2498+
pod3.Status.Conditions[0].Status = v1.ConditionTrue
2499+
_ = controller.podStore.Update(pod3)
2500+
controller.updatePod(pod2, pod3)
2501+
// It shouldn't get endpoints request as the endpoints in the cache is out-of-date.
2502+
timer := time.NewTimer(100 * time.Millisecond)
2503+
select {
2504+
case <-timer.C:
2505+
case <-blockNextAction:
2506+
t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server")
2507+
}
2508+
2509+
// Applying the endpoints update to the cache should cause test server to update endpoints.
2510+
close(blockUpdate)
2511+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated")
2512+
2513+
close(blockNextAction)
2514+
close(stopChan)
2515+
}
2516+
24262517
func TestSyncServiceAddresses(t *testing.T) {
24272518
makeService := func(tolerateUnready bool) *v1.Service {
24282519
return &v1.Service{
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"sync"
21+
22+
v1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/types"
24+
)
25+
26+
// staleEndpointsTracker tracks Endpoints and their stale resource versions to
27+
// help determine if an Endpoints is stale.
28+
type staleEndpointsTracker struct {
29+
// lock protects staleResourceVersionByEndpoints.
30+
lock sync.RWMutex
31+
// staleResourceVersionByEndpoints tracks the stale resource version of Endpoints.
32+
staleResourceVersionByEndpoints map[types.NamespacedName]string
33+
}
34+
35+
func newStaleEndpointsTracker() *staleEndpointsTracker {
36+
return &staleEndpointsTracker{
37+
staleResourceVersionByEndpoints: map[types.NamespacedName]string{},
38+
}
39+
}
40+
41+
func (t *staleEndpointsTracker) Stale(endpoints *v1.Endpoints) {
42+
t.lock.Lock()
43+
defer t.lock.Unlock()
44+
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
45+
t.staleResourceVersionByEndpoints[nn] = endpoints.ResourceVersion
46+
}
47+
48+
func (t *staleEndpointsTracker) IsStale(endpoints *v1.Endpoints) bool {
49+
t.lock.RLock()
50+
defer t.lock.RUnlock()
51+
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
52+
staleResourceVersion, exists := t.staleResourceVersionByEndpoints[nn]
53+
if exists && staleResourceVersion == endpoints.ResourceVersion {
54+
return true
55+
}
56+
return false
57+
}
58+
59+
func (t *staleEndpointsTracker) Delete(namespace, name string) {
60+
t.lock.Lock()
61+
defer t.lock.Unlock()
62+
nn := types.NamespacedName{Namespace: namespace, Name: name}
63+
delete(t.staleResourceVersionByEndpoints, nn)
64+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
)
27+
28+
func TestStaleEndpointsTracker(t *testing.T) {
29+
ns := metav1.NamespaceDefault
30+
tracker := newStaleEndpointsTracker()
31+
32+
endpoints := &v1.Endpoints{
33+
ObjectMeta: metav1.ObjectMeta{
34+
Name: "foo",
35+
Namespace: ns,
36+
ResourceVersion: "1",
37+
},
38+
Subsets: []v1.EndpointSubset{{
39+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
40+
Ports: []v1.EndpointPort{{Port: 1000}},
41+
}},
42+
}
43+
44+
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false before the endpoint is staled")
45+
46+
tracker.Stale(endpoints)
47+
assert.True(t, tracker.IsStale(endpoints), "IsStale should return true after the endpoint is staled")
48+
49+
endpoints.ResourceVersion = "2"
50+
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false after the endpoint is updated")
51+
52+
tracker.Delete(endpoints.Namespace, endpoints.Name)
53+
assert.Empty(t, tracker.staleResourceVersionByEndpoints)
54+
}

0 commit comments

Comments
 (0)