Skip to content

Commit 3bd9758

Browse files
committed
Fix endpoints status out-of-sync when the pod state changes rapidly
When Pod state changes rapidly, endpoints controller may use outdated informer cache to sync Service. If the outdated endpoints appear to be expected by the controller, it skips updating it. The commit fixes it by checking if endpoints informer cache is outdated when processing a service. If the endpoints is stale, it returns an error and retries later. Signed-off-by: Quan Tian <[email protected]>
1 parent 10ae1db commit 3bd9758

File tree

5 files changed

+377
-13
lines changed

5 files changed

+377
-13
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInf
110110
e.endpointsLister = endpointsInformer.Lister()
111111
e.endpointsSynced = endpointsInformer.Informer().HasSynced
112112

113+
e.staleEndpointsTracker = newStaleEndpointsTracker()
113114
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
114115
e.eventBroadcaster = broadcaster
115116
e.eventRecorder = recorder
@@ -145,6 +146,8 @@ type Controller struct {
145146
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
146147
// Added as a member to the struct to allow injection for testing.
147148
endpointsSynced cache.InformerSynced
149+
// staleEndpointsTracker can help determine if a cached Endpoints is out of date.
150+
staleEndpointsTracker *staleEndpointsTracker
148151

149152
// Services that need to be updated. A channel is inappropriate here,
150153
// because it allows services with lots of pods to be serviced much
@@ -384,6 +387,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
384387
return err
385388
}
386389
e.triggerTimeTracker.DeleteService(namespace, name)
390+
e.staleEndpointsTracker.Delete(namespace, name)
387391
return nil
388392
}
389393

@@ -473,6 +477,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
473477
Labels: service.Labels,
474478
},
475479
}
480+
} else if e.staleEndpointsTracker.IsStale(currentEndpoints) {
481+
return fmt.Errorf("endpoints informer cache is out of date, resource version %s already processed for endpoints %s", currentEndpoints.ResourceVersion, key)
476482
}
477483

478484
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
@@ -555,6 +561,12 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
555561

556562
return err
557563
}
564+
// If the current endpoints is updated we track the old resource version, so
565+
// if we obtain this resource version again from the lister we know is outdated
566+
// and we need to retry later to wait for the informer cache to be up-to-date.
567+
if !createEndpoints {
568+
e.staleEndpointsTracker.Stale(currentEndpoints)
569+
}
558570
return nil
559571
}
560572

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
160160
return httptest.NewServer(mux), &fakeEndpointsHandler
161161
}
162162

163-
// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
164-
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
165-
// be sent in the response.
166-
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
163+
// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE"
164+
// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait
165+
// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response.
166+
func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
167167

168168
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
169169
if controller == nil {
@@ -172,23 +172,37 @@ func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointCont
172172
return
173173
}
174174

175-
if req.Method == "POST" {
176-
controller.endpointsStore.Add(endpoint)
175+
if req.Method == "POST" || req.Method == "PUT" {
176+
if blockUpdate != nil {
177+
go func() {
178+
// Delay the update of endpoints to make endpoints cache out of sync
179+
<-blockUpdate
180+
_ = controller.endpointsStore.Add(endpoint)
181+
}()
182+
} else {
183+
_ = controller.endpointsStore.Add(endpoint)
184+
}
177185
blockNextAction <- struct{}{}
178186
}
179187

180188
if req.Method == "DELETE" {
181-
go func() {
182-
// Delay the deletion of endoints to make endpoint cache out of sync
183-
<-blockDelete
184-
controller.endpointsStore.Delete(endpoint)
189+
if blockDelete != nil {
190+
go func() {
191+
// Delay the deletion of endpoints to make endpoints cache out of sync
192+
<-blockDelete
193+
_ = controller.endpointsStore.Delete(endpoint)
194+
controller.onEndpointsDelete(endpoint)
195+
}()
196+
} else {
197+
_ = controller.endpointsStore.Delete(endpoint)
185198
controller.onEndpointsDelete(endpoint)
186-
}()
199+
}
187200
blockNextAction <- struct{}{}
188201
}
189202

203+
res.Header().Set("Content-Type", "application/json")
190204
res.WriteHeader(http.StatusOK)
191-
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
205+
_, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint)))
192206
}
193207

194208
mux := http.NewServeMux()
@@ -2378,7 +2392,7 @@ func TestMultipleServiceChanges(t *testing.T) {
23782392
blockDelete := make(chan struct{})
23792393
blockNextAction := make(chan struct{})
23802394
stopChan := make(chan struct{})
2381-
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
2395+
testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns)
23822396
defer testServer.Close()
23832397

23842398
tCtx := ktesting.Init(t)
@@ -2423,6 +2437,83 @@ func TestMultipleServiceChanges(t *testing.T) {
24232437
close(stopChan)
24242438
}
24252439

2440+
// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are
2441+
// eventually resynced after multiple Pod changes.
2442+
func TestMultiplePodChanges(t *testing.T) {
2443+
ns := metav1.NamespaceDefault
2444+
2445+
readyEndpoints := &v1.Endpoints{
2446+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
2447+
Subsets: []v1.EndpointSubset{{
2448+
Addresses: []v1.EndpointAddress{
2449+
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2450+
},
2451+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
2452+
}},
2453+
}
2454+
notReadyEndpoints := &v1.Endpoints{
2455+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"},
2456+
Subsets: []v1.EndpointSubset{{
2457+
NotReadyAddresses: []v1.EndpointAddress{
2458+
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2459+
},
2460+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
2461+
}},
2462+
}
2463+
2464+
controller := &endpointController{}
2465+
blockUpdate := make(chan struct{})
2466+
blockNextAction := make(chan struct{})
2467+
stopChan := make(chan struct{})
2468+
testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns)
2469+
defer testServer.Close()
2470+
2471+
tCtx := ktesting.Init(t)
2472+
*controller = *newController(tCtx, testServer.URL, 0*time.Second)
2473+
pod := testPod(ns, 0, 1, true, ipv4only)
2474+
_ = controller.podStore.Add(pod)
2475+
_ = controller.endpointsStore.Add(readyEndpoints)
2476+
_ = controller.serviceStore.Add(&v1.Service{
2477+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2478+
Spec: v1.ServiceSpec{
2479+
Selector: map[string]string{"foo": "bar"},
2480+
ClusterIP: "10.0.0.1",
2481+
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
2482+
},
2483+
})
2484+
2485+
go func() { controller.Run(tCtx, 1) }()
2486+
2487+
// Rapidly update the Pod: Ready -> NotReady -> Ready.
2488+
pod2 := pod.DeepCopy()
2489+
pod2.ResourceVersion = "2"
2490+
pod2.Status.Conditions[0].Status = v1.ConditionFalse
2491+
_ = controller.podStore.Update(pod2)
2492+
controller.updatePod(pod, pod2)
2493+
// blockNextAction should eventually unblock once server gets endpoints request.
2494+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server")
2495+
// The endpoints update hasn't been applied to the cache yet.
2496+
pod3 := pod.DeepCopy()
2497+
pod3.ResourceVersion = "3"
2498+
pod3.Status.Conditions[0].Status = v1.ConditionTrue
2499+
_ = controller.podStore.Update(pod3)
2500+
controller.updatePod(pod2, pod3)
2501+
// It shouldn't get endpoints request as the endpoints in the cache is out-of-date.
2502+
timer := time.NewTimer(100 * time.Millisecond)
2503+
select {
2504+
case <-timer.C:
2505+
case <-blockNextAction:
2506+
t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server")
2507+
}
2508+
2509+
// Applying the endpoints update to the cache should cause test server to update endpoints.
2510+
close(blockUpdate)
2511+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated")
2512+
2513+
close(blockNextAction)
2514+
close(stopChan)
2515+
}
2516+
24262517
func TestSyncServiceAddresses(t *testing.T) {
24272518
makeService := func(tolerateUnready bool) *v1.Service {
24282519
return &v1.Service{
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"sync"
21+
22+
v1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/types"
24+
)
25+
26+
// staleEndpointsTracker tracks Endpoints and their stale resource versions to
27+
// help determine if an Endpoints is stale.
28+
type staleEndpointsTracker struct {
29+
// lock protects staleResourceVersionByEndpoints.
30+
lock sync.RWMutex
31+
// staleResourceVersionByEndpoints tracks the stale resource version of Endpoints.
32+
staleResourceVersionByEndpoints map[types.NamespacedName]string
33+
}
34+
35+
func newStaleEndpointsTracker() *staleEndpointsTracker {
36+
return &staleEndpointsTracker{
37+
staleResourceVersionByEndpoints: map[types.NamespacedName]string{},
38+
}
39+
}
40+
41+
func (t *staleEndpointsTracker) Stale(endpoints *v1.Endpoints) {
42+
t.lock.Lock()
43+
defer t.lock.Unlock()
44+
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
45+
t.staleResourceVersionByEndpoints[nn] = endpoints.ResourceVersion
46+
}
47+
48+
func (t *staleEndpointsTracker) IsStale(endpoints *v1.Endpoints) bool {
49+
t.lock.RLock()
50+
defer t.lock.RUnlock()
51+
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
52+
staleResourceVersion, exists := t.staleResourceVersionByEndpoints[nn]
53+
if exists && staleResourceVersion == endpoints.ResourceVersion {
54+
return true
55+
}
56+
return false
57+
}
58+
59+
func (t *staleEndpointsTracker) Delete(namespace, name string) {
60+
t.lock.Lock()
61+
defer t.lock.Unlock()
62+
nn := types.NamespacedName{Namespace: namespace, Name: name}
63+
delete(t.staleResourceVersionByEndpoints, nn)
64+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
)
27+
28+
func TestStaleEndpointsTracker(t *testing.T) {
29+
ns := metav1.NamespaceDefault
30+
tracker := newStaleEndpointsTracker()
31+
32+
endpoints := &v1.Endpoints{
33+
ObjectMeta: metav1.ObjectMeta{
34+
Name: "foo",
35+
Namespace: ns,
36+
ResourceVersion: "1",
37+
},
38+
Subsets: []v1.EndpointSubset{{
39+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
40+
Ports: []v1.EndpointPort{{Port: 1000}},
41+
}},
42+
}
43+
44+
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false before the endpoint is staled")
45+
46+
tracker.Stale(endpoints)
47+
assert.True(t, tracker.IsStale(endpoints), "IsStale should return true after the endpoint is staled")
48+
49+
endpoints.ResourceVersion = "2"
50+
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false after the endpoint is updated")
51+
52+
tracker.Delete(endpoints.Namespace, endpoints.Name)
53+
assert.Empty(t, tracker.staleResourceVersionByEndpoints)
54+
}

0 commit comments

Comments
 (0)