Skip to content

Commit 23a3353

Browse files
Requeue service after endpoint deletion
- ensure endpoints that have been deleted and are desired are recreated despite a possibly out of date endpoint cache
1 parent c94242a commit 23a3353

File tree

2 files changed

+151
-0
lines changed

2 files changed

+151
-0
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
110110
e.podLister = podInformer.Lister()
111111
e.podsSynced = podInformer.Informer().HasSynced
112112

113+
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
114+
DeleteFunc: e.onEndpointsDelete,
115+
})
113116
e.endpointsLister = endpointsInformer.Lister()
114117
e.endpointsSynced = endpointsInformer.Informer().HasSynced
115118

@@ -287,6 +290,15 @@ func (e *EndpointController) onServiceDelete(obj interface{}) {
287290
e.queue.Add(key)
288291
}
289292

293+
func (e *EndpointController) onEndpointsDelete(obj interface{}) {
294+
key, err := controller.KeyFunc(obj)
295+
if err != nil {
296+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
297+
return
298+
}
299+
e.queue.Add(key)
300+
}
301+
290302
// worker runs a worker thread that just dequeues items, processes them, and
291303
// marks them done. You may run as many of these in parallel as you wish; the
292304
// workqueue guarantees that they will not end up processing the same service

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,49 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
159159
return httptest.NewServer(mux), &fakeEndpointsHandler
160160
}
161161

162+
// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
163+
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
164+
// be sent in the response.
165+
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
166+
167+
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
168+
if controller == nil {
169+
res.WriteHeader(http.StatusInternalServerError)
170+
res.Write([]byte("controller has not been set yet"))
171+
return
172+
}
173+
174+
if req.Method == "POST" {
175+
controller.endpointsStore.Add(endpoint)
176+
blockNextAction <- struct{}{}
177+
}
178+
179+
if req.Method == "DELETE" {
180+
go func() {
181+
// Delay the deletion of endoints to make endpoint cache out of sync
182+
<-blockDelete
183+
controller.endpointsStore.Delete(endpoint)
184+
controller.onEndpointsDelete(endpoint)
185+
}()
186+
blockNextAction <- struct{}{}
187+
}
188+
189+
res.WriteHeader(http.StatusOK)
190+
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
191+
}
192+
193+
mux := http.NewServeMux()
194+
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc)
195+
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc)
196+
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {})
197+
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
198+
t.Errorf("unexpected request: %v", req.RequestURI)
199+
http.Error(res, "", http.StatusNotFound)
200+
})
201+
return httptest.NewServer(mux)
202+
203+
}
204+
162205
type endpointController struct {
163206
*EndpointController
164207
podStore cache.Store
@@ -1954,9 +1997,105 @@ func TestEndpointPortFromServicePort(t *testing.T) {
19541997
}
19551998
}
19561999

2000+
// TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated
2001+
// A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately.
2002+
// After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated.
2003+
func TestMultipleServiceChanges(t *testing.T) {
2004+
ns := metav1.NamespaceDefault
2005+
expectedSubsets := []v1.EndpointSubset{{
2006+
Addresses: []v1.EndpointAddress{
2007+
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
2008+
},
2009+
}}
2010+
endpoint := &v1.Endpoints{
2011+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
2012+
Subsets: expectedSubsets,
2013+
}
2014+
2015+
controller := &endpointController{}
2016+
blockDelete := make(chan struct{})
2017+
blockNextAction := make(chan struct{})
2018+
stopChan := make(chan struct{})
2019+
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
2020+
defer testServer.Close()
2021+
2022+
*controller = *newController(testServer.URL, 0*time.Second)
2023+
addPods(controller.podStore, ns, 1, 1, 0, ipv4only)
2024+
2025+
go func() { controller.Run(1, stopChan) }()
2026+
2027+
svc := &v1.Service{
2028+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
2029+
Spec: v1.ServiceSpec{
2030+
Selector: map[string]string{"foo": "bar"},
2031+
ClusterIP: "None",
2032+
Ports: nil,
2033+
},
2034+
}
2035+
2036+
controller.serviceStore.Add(svc)
2037+
controller.onServiceUpdate(svc)
2038+
// blockNextAction should eventually unblock once server gets endpoint request.
2039+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server")
2040+
2041+
controller.serviceStore.Delete(svc)
2042+
controller.onServiceDelete(svc)
2043+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server")
2044+
2045+
// If endpoints cache has not updated before service update is registered
2046+
// Services add will not trigger a Create endpoint request.
2047+
controller.serviceStore.Add(svc)
2048+
controller.onServiceUpdate(svc)
2049+
2050+
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
2051+
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
2052+
return controller.queue.Len() == 0, nil
2053+
})
2054+
2055+
// Cause test server to delete endpoints
2056+
close(blockDelete)
2057+
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated")
2058+
2059+
close(blockNextAction)
2060+
close(stopChan)
2061+
}
2062+
2063+
func TestEndpointsDeletionEvents(t *testing.T) {
2064+
ns := metav1.NamespaceDefault
2065+
testServer, _ := makeTestServer(t, ns)
2066+
defer testServer.Close()
2067+
controller := newController(testServer.URL, 0)
2068+
store := controller.endpointsStore
2069+
ep1 := &v1.Endpoints{
2070+
ObjectMeta: metav1.ObjectMeta{
2071+
Name: "foo",
2072+
Namespace: ns,
2073+
ResourceVersion: "rv1",
2074+
},
2075+
}
2076+
2077+
// Test Unexpected and Expected Deletes
2078+
store.Delete(ep1)
2079+
controller.onEndpointsDelete(ep1)
2080+
2081+
if controller.queue.Len() != 1 {
2082+
t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len())
2083+
}
2084+
}
2085+
19572086
func stringVal(str *string) string {
19582087
if str == nil {
19592088
return "nil"
19602089
}
19612090
return *str
19622091
}
2092+
2093+
// waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive
2094+
func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) {
2095+
timer := time.NewTimer(timeout)
2096+
select {
2097+
case <-timer.C:
2098+
t.Errorf(errorMsg)
2099+
case <-receivingChan:
2100+
}
2101+
}

0 commit comments

Comments
 (0)