@@ -214,146 +214,79 @@ func TestGet(t *testing.T) {
214214 })
215215
216216 t .Run ("Cancels stream when endpoint update queue overflows" , func (t * testing.T ) {
217- server := makeServer (t )
218-
219- stream := newBlockingGetStream ()
220- defer stream .Cancel ()
221-
222- errCh := make (chan error , 1 )
223- go func () {
224- errCh <- server .Get (& pb.GetDestination {
225- Scheme : "k8s" ,
226- Path : fmt .Sprintf ("%s:%d" , fullyQualifiedName , port ),
227- }, stream )
228- }()
229-
230- stream .WaitForSend (t , 5 * time .Second )
217+ srv := makeServer (t )
231218
232- for i := range updateQueueCapacity + 10 {
233- addEndpointToSlice (t , server ,
234- fmt .Sprintf ("name1-overflow-%d" , i ),
235- fmt .Sprintf ("172.17.0.%d" , i ))
236- time .Sleep (50 * time .Millisecond )
237- }
238-
239- stream .Release ()
240-
241- select {
242- case <- errCh :
243- case <- time .After (5 * time .Second ):
244- t .Fatal ("timed out waiting for Get to return after overflow" )
245- }
246-
247- if got := stream .SendCount (); got == 0 {
248- t .Fatal ("expected at least one update before overflow" )
249- }
219+ runEndpointOverflowTest (t , endpointOverflowScenario {
220+ server : srv ,
221+ servicePath : fmt .Sprintf ("%s:%d" , fullyQualifiedName , port ),
222+ metricLabels : prometheus.Labels {
223+ "service" : fmt .Sprintf ("%s:%d" , fullyQualifiedName , port ),
224+ },
225+ waitMessage : fmt .Sprintf ("endpoint translator overflow for %s:%d" , fullyQualifiedName , port ),
226+ trigger : func (t * testing.T , srv * server , i int ) {
227+ addEndpointToSlice (t , srv , fmt .Sprintf ("name1-overflow-%d" , i ), fmt .Sprintf ("172.17.0.%d" , 201 + i ))
228+ time .Sleep (10 * time .Millisecond )
229+ },
230+ })
250231 })
251232
252233 t .Run ("Cancels stream when federated endpoint update queue overflows" , func (t * testing.T ) {
253- t .Skip ("Known to fail presently; re-enable when fixed" )
254-
255- server , remoteStore := getServerWithRemoteStore (t )
234+ t .Skip ("TODO: fix stream cancelation" )
256235
236+ srv , remoteStore := getServerWithRemoteStore (t )
257237 remoteAPI , ok := remoteStore .Get ("target" )
258238 if ! ok {
259239 t .Fatal ("remote cluster API not found" )
260240 }
261241
262- ctx := context .Background ()
263- svc := & corev1.Service {
264- ObjectMeta : metav1.ObjectMeta {
265- Name : "foo-federated" ,
266- Namespace : "ns" ,
267- Annotations : map [string ]string {
268- pkgk8s .RemoteDiscoveryAnnotation : "foo@target" ,
269- },
242+ runEndpointOverflowTest (t , endpointOverflowScenario {
243+ server : srv ,
244+ servicePath : "foo-federated.ns.svc.mycluster.local:80" ,
245+ metricLabels : prometheus.Labels {
246+ "service" : "ns/foo.ns.svc.cluster.local:80" ,
270247 },
271- Spec : corev1.ServiceSpec {
272- Ports : []corev1.ServicePort {
273- {Port : 80 },
274- },
248+ waitMessage : "federated endpoint translator overflow for ns/foo.ns.svc.cluster.local:80" ,
249+ prepare : func (t * testing.T , srv * server ) {
250+ ctx := context .Background ()
251+ svc := & corev1.Service {
252+ ObjectMeta : metav1.ObjectMeta {
253+ Name : "foo-federated" ,
254+ Namespace : "ns" ,
255+ Annotations : map [string ]string {
256+ pkgk8s .RemoteDiscoveryAnnotation : "foo@target" ,
257+ },
258+ },
259+ Spec : corev1.ServiceSpec {
260+ Ports : []corev1.ServicePort {
261+ {Port : 80 },
262+ },
263+ },
264+ }
265+
266+ if _ , err := srv .k8sAPI .Client .CoreV1 ().Services ("ns" ).Create (ctx , svc , metav1.CreateOptions {}); err != nil {
267+ t .Fatalf ("failed to create federated service: %v" , err )
268+ }
269+
270+ id := watcher.ServiceID {Namespace : "ns" , Name : "foo-federated" }
271+ deadline := time .Now ().Add (5 * time .Second )
272+ for {
273+ srv .federatedServices .RLock ()
274+ _ , found := srv .federatedServices .services [id ]
275+ srv .federatedServices .RUnlock ()
276+ if found {
277+ break
278+ }
279+ if time .Now ().After (deadline ) {
280+ t .Fatalf ("federated service %s/%s not registered" , "ns" , "foo-federated" )
281+ }
282+ time .Sleep (10 * time .Millisecond )
283+ }
284+ },
285+ trigger : func (t * testing.T , _ * server , i int ) {
286+ addEndpointToRemoteSlice (t , remoteAPI , "ns" , "foo" , fmt .Sprintf ("172.17.155.%d" , i + 1 ))
287+ time .Sleep (50 * time .Millisecond )
275288 },
276- }
277-
278- if _ , err := server .k8sAPI .Client .CoreV1 ().Services ("ns" ).Create (ctx , svc , metav1.CreateOptions {}); err != nil {
279- t .Fatalf ("failed to create federated service: %v" , err )
280- }
281-
282- id := watcher.ServiceID {Namespace : "ns" , Name : "foo-federated" }
283- deadline := time .Now ().Add (5 * time .Second )
284- for {
285- server .federatedServices .RLock ()
286- _ , found := server .federatedServices .services [id ]
287- server .federatedServices .RUnlock ()
288- if found {
289- break
290- }
291- if time .Now ().After (deadline ) {
292- t .Fatalf ("federated service %s/%s not registered" , "ns" , "foo-federated" )
293- }
294- time .Sleep (10 * time .Millisecond )
295- }
296-
297- stream := newBlockingGetStream ()
298- defer stream .Cancel ()
299-
300- errCh := make (chan error , 1 )
301- go func () {
302- errCh <- server .Get (& pb.GetDestination {
303- Scheme : "k8s" ,
304- Path : "foo-federated.ns.svc.mycluster.local:80" ,
305- }, stream )
306- }()
307-
308- stream .WaitForSend (t , 5 * time .Second )
309-
310- serviceLabel := "ns/foo.ns.svc.cluster.local:80"
311- metric , err := updatesQueueOverflowCounter .GetMetricWith (prometheus.Labels {
312- "service" : serviceLabel ,
313289 })
314- if err != nil {
315- t .Fatalf ("failed to get overflow counter: %v" , err )
316- }
317- initialOverflow := counterValue (t , metric )
318-
319- for i := 0 ; i < updateQueueCapacity + 10 ; i ++ {
320- addEndpointToRemoteSlice (t , remoteAPI , "ns" , "foo" , fmt .Sprintf ("172.17.155.%d" , i + 1 ))
321- time .Sleep (50 * time .Millisecond )
322- if counterValue (t , metric ) > initialOverflow {
323- break
324- }
325- }
326-
327- overflowDeadline := time .Now ().Add (10 * time .Second )
328- overflowed := false
329- for time .Now ().Before (overflowDeadline ) {
330- if counterValue (t , metric ) > initialOverflow {
331- overflowed = true
332- break
333- }
334- time .Sleep (10 * time .Millisecond )
335- }
336- if ! overflowed {
337- t .Fatalf ("waiting for federated endpoint translator overflow for %s" , serviceLabel )
338- }
339-
340- stream .Release ()
341-
342- select {
343- case err := <- errCh :
344- if err == nil {
345- t .Fatal ("expected Get to be canceled after overflow" )
346- }
347- if ! errors .Is (err , context .Canceled ) && status .Code (err ) != codes .Canceled {
348- t .Fatalf ("expected cancellation error, got %v" , err )
349- }
350- case <- time .After (5 * time .Second ):
351- t .Fatal ("timed out waiting for Get to return after overflow" )
352- }
353-
354- if got := stream .SendCount (); got == 0 {
355- t .Fatal ("expected at least one update before overflow" )
356- }
357290 })
358291
359292 t .Run ("Return endpoint with unknown protocol hint and identity when service name contains skipped inbound port" , func (t * testing.T ) {
@@ -1582,6 +1515,98 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) {
15821515 }, nil
15831516}
15841517
1518+ type endpointOverflowScenario struct {
1519+ server * server
1520+ servicePath string
1521+ metricLabels prometheus.Labels
1522+ waitMessage string
1523+ prepare func (* testing.T , * server )
1524+ trigger func (* testing.T , * server , int )
1525+ }
1526+
1527+ func runEndpointOverflowTest (t * testing.T , sc endpointOverflowScenario ) {
1528+ t .Helper ()
1529+
1530+ if sc .server == nil {
1531+ t .Fatal ("endpoint overflow scenario requires a server" )
1532+ }
1533+ if sc .trigger == nil {
1534+ t .Fatal ("endpoint overflow scenario requires a trigger function" )
1535+ }
1536+ if sc .waitMessage == "" {
1537+ sc .waitMessage = fmt .Sprintf ("endpoint translator overflow for %s" , sc .servicePath )
1538+ }
1539+
1540+ if sc .prepare != nil {
1541+ sc .prepare (t , sc .server )
1542+ }
1543+
1544+ stream := newBlockingGetStream ()
1545+ defer stream .Release ()
1546+ defer stream .Cancel ()
1547+
1548+ errCh := make (chan error , 1 )
1549+ go func () {
1550+ errCh <- sc .server .Get (& pb.GetDestination {
1551+ Scheme : "k8s" ,
1552+ Path : sc .servicePath ,
1553+ }, stream )
1554+ }()
1555+
1556+ stream .WaitForSend (t , 5 * time .Second )
1557+
1558+ metric , err := updatesQueueOverflowCounter .GetMetricWith (sc .metricLabels )
1559+ if err != nil {
1560+ t .Fatalf ("failed to get overflow counter: %v" , err )
1561+ }
1562+ initialOverflow := counterValue (t , metric )
1563+
1564+ for i := 0 ; i < updateQueueCapacity + 10 ; i ++ {
1565+ sc .trigger (t , sc .server , i )
1566+ if counterValue (t , metric ) > initialOverflow {
1567+ break
1568+ }
1569+ }
1570+
1571+ overflowDeadline := time .Now ().Add (10 * time .Second )
1572+ overflowed := false
1573+ for time .Now ().Before (overflowDeadline ) {
1574+ if counterValue (t , metric ) > initialOverflow {
1575+ overflowed = true
1576+ break
1577+ }
1578+ time .Sleep (10 * time .Millisecond )
1579+ }
1580+ if ! overflowed {
1581+ t .Fatalf ("waiting for %s" , sc .waitMessage )
1582+ }
1583+
1584+ // Wait for Get to finish without unblocking Send so we catch streams that
1585+ // cannot terminate while a Send is blocked.
1586+ select {
1587+ case err := <- errCh :
1588+ if err != nil {
1589+ t .Fatalf ("expected Get to be end after overflow, got %v" , err )
1590+ }
1591+ case <- time .After (5 * time .Second ):
1592+ t .Fatal ("timed out waiting for Get to return after overflow" )
1593+ }
1594+
1595+ stream .Release ()
1596+
1597+ if got := stream .SendCount (); got == 0 {
1598+ t .Fatal ("expected at least one update before overflow" )
1599+ }
1600+
1601+ updates := stream .Updates ()
1602+ if len (updates ) == 0 || updates [0 ].GetAdd () == nil {
1603+ t .Fatalf ("expected initial update to be an Add, got %T" , updates [0 ].GetUpdate ())
1604+ }
1605+ if len (updates ) > 1 && updates [1 ].GetAdd () == nil {
1606+ t .Fatalf ("expected buffered update to be an Add, got %T" , updates [1 ].GetUpdate ())
1607+ }
1608+ }
1609+
15851610type blockingGetStream struct {
15861611 util.MockServerStream
15871612
0 commit comments