@@ -159,6 +159,49 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
159
159
return httptest .NewServer (mux ), & fakeEndpointsHandler
160
160
}
161
161
162
+ // makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
163
+ // block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
164
+ // be sent in the response.
165
+ func makeBlockingEndpointDeleteTestServer (t * testing.T , controller * endpointController , endpoint * v1.Endpoints , blockDelete , blockNextAction chan struct {}, namespace string ) * httptest.Server {
166
+
167
+ handlerFunc := func (res http.ResponseWriter , req * http.Request ) {
168
+ if controller == nil {
169
+ res .WriteHeader (http .StatusInternalServerError )
170
+ res .Write ([]byte ("controller has not been set yet" ))
171
+ return
172
+ }
173
+
174
+ if req .Method == "POST" {
175
+ controller .endpointsStore .Add (endpoint )
176
+ blockNextAction <- struct {}{}
177
+ }
178
+
179
+ if req .Method == "DELETE" {
180
+ go func () {
181
+ // Delay the deletion of endoints to make endpoint cache out of sync
182
+ <- blockDelete
183
+ controller .endpointsStore .Delete (endpoint )
184
+ controller .onEndpointsDelete (endpoint )
185
+ }()
186
+ blockNextAction <- struct {}{}
187
+ }
188
+
189
+ res .WriteHeader (http .StatusOK )
190
+ res .Write ([]byte (runtime .EncodeOrDie (clientscheme .Codecs .LegacyCodec (v1 .SchemeGroupVersion ), & v1.Endpoints {})))
191
+ }
192
+
193
+ mux := http .NewServeMux ()
194
+ mux .HandleFunc ("/api/v1/namespaces/" + namespace + "/endpoints" , handlerFunc )
195
+ mux .HandleFunc ("/api/v1/namespaces/" + namespace + "/endpoints/" , handlerFunc )
196
+ mux .HandleFunc ("/api/v1/namespaces/" + namespace + "/events" , func (res http.ResponseWriter , req * http.Request ) {})
197
+ mux .HandleFunc ("/" , func (res http.ResponseWriter , req * http.Request ) {
198
+ t .Errorf ("unexpected request: %v" , req .RequestURI )
199
+ http .Error (res , "" , http .StatusNotFound )
200
+ })
201
+ return httptest .NewServer (mux )
202
+
203
+ }
204
+
162
205
type endpointController struct {
163
206
* EndpointController
164
207
podStore cache.Store
@@ -1954,9 +1997,105 @@ func TestEndpointPortFromServicePort(t *testing.T) {
1954
1997
}
1955
1998
}
1956
1999
2000
+ // TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated
2001
+ // A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately.
2002
+ // After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated.
2003
+ func TestMultipleServiceChanges (t * testing.T ) {
2004
+ ns := metav1 .NamespaceDefault
2005
+ expectedSubsets := []v1.EndpointSubset {{
2006
+ Addresses : []v1.EndpointAddress {
2007
+ {IP : "1.2.3.4" , NodeName : & emptyNodeName , TargetRef : & v1.ObjectReference {Kind : "Pod" , Name : "pod0" , Namespace : ns }},
2008
+ },
2009
+ }}
2010
+ endpoint := & v1.Endpoints {
2011
+ ObjectMeta : metav1.ObjectMeta {Name : "foo" , Namespace : ns , ResourceVersion : "1" },
2012
+ Subsets : expectedSubsets ,
2013
+ }
2014
+
2015
+ controller := & endpointController {}
2016
+ blockDelete := make (chan struct {})
2017
+ blockNextAction := make (chan struct {})
2018
+ stopChan := make (chan struct {})
2019
+ testServer := makeBlockingEndpointDeleteTestServer (t , controller , endpoint , blockDelete , blockNextAction , ns )
2020
+ defer testServer .Close ()
2021
+
2022
+ * controller = * newController (testServer .URL , 0 * time .Second )
2023
+ addPods (controller .podStore , ns , 1 , 1 , 0 , ipv4only )
2024
+
2025
+ go func () { controller .Run (1 , stopChan ) }()
2026
+
2027
+ svc := & v1.Service {
2028
+ ObjectMeta : metav1.ObjectMeta {Name : "foo" , Namespace : ns },
2029
+ Spec : v1.ServiceSpec {
2030
+ Selector : map [string ]string {"foo" : "bar" },
2031
+ ClusterIP : "None" ,
2032
+ Ports : nil ,
2033
+ },
2034
+ }
2035
+
2036
+ controller .serviceStore .Add (svc )
2037
+ controller .onServiceUpdate (svc )
2038
+ // blockNextAction should eventually unblock once server gets endpoint request.
2039
+ waitForChanReceive (t , 1 * time .Second , blockNextAction , "Service Add should have caused a request to be sent to the test server" )
2040
+
2041
+ controller .serviceStore .Delete (svc )
2042
+ controller .onServiceDelete (svc )
2043
+ waitForChanReceive (t , 1 * time .Second , blockNextAction , "Service Delete should have caused a request to be sent to the test server" )
2044
+
2045
+ // If endpoints cache has not updated before service update is registered
2046
+ // Services add will not trigger a Create endpoint request.
2047
+ controller .serviceStore .Add (svc )
2048
+ controller .onServiceUpdate (svc )
2049
+
2050
+ // Ensure the work queue has been processed by looping for up to a second to prevent flakes.
2051
+ wait .PollImmediate (50 * time .Millisecond , 1 * time .Second , func () (bool , error ) {
2052
+ return controller .queue .Len () == 0 , nil
2053
+ })
2054
+
2055
+ // Cause test server to delete endpoints
2056
+ close (blockDelete )
2057
+ waitForChanReceive (t , 1 * time .Second , blockNextAction , "Endpoint should have been recreated" )
2058
+
2059
+ close (blockNextAction )
2060
+ close (stopChan )
2061
+ }
2062
+
2063
+ func TestEndpointsDeletionEvents (t * testing.T ) {
2064
+ ns := metav1 .NamespaceDefault
2065
+ testServer , _ := makeTestServer (t , ns )
2066
+ defer testServer .Close ()
2067
+ controller := newController (testServer .URL , 0 )
2068
+ store := controller .endpointsStore
2069
+ ep1 := & v1.Endpoints {
2070
+ ObjectMeta : metav1.ObjectMeta {
2071
+ Name : "foo" ,
2072
+ Namespace : ns ,
2073
+ ResourceVersion : "rv1" ,
2074
+ },
2075
+ }
2076
+
2077
+ // Test Unexpected and Expected Deletes
2078
+ store .Delete (ep1 )
2079
+ controller .onEndpointsDelete (ep1 )
2080
+
2081
+ if controller .queue .Len () != 1 {
2082
+ t .Errorf ("Expected one service to be in the queue, found %d" , controller .queue .Len ())
2083
+ }
2084
+ }
2085
+
1957
2086
func stringVal (str * string ) string {
1958
2087
if str == nil {
1959
2088
return "nil"
1960
2089
}
1961
2090
return * str
1962
2091
}
2092
+
2093
+ // waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive
2094
+ func waitForChanReceive (t * testing.T , timeout time.Duration , receivingChan chan struct {}, errorMsg string ) {
2095
+ timer := time .NewTimer (timeout )
2096
+ select {
2097
+ case <- timer .C :
2098
+ t .Errorf (errorMsg )
2099
+ case <- receivingChan :
2100
+ }
2101
+ }
0 commit comments