@@ -43,6 +43,7 @@ import (
43
43
"k8s.io/apimachinery/pkg/watch"
44
44
example "k8s.io/apiserver/pkg/apis/example"
45
45
"k8s.io/apiserver/pkg/endpoints/handlers"
46
+ "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
46
47
apitesting "k8s.io/apiserver/pkg/endpoints/testing"
47
48
"k8s.io/apiserver/pkg/registry/rest"
48
49
"k8s.io/client-go/dynamic"
@@ -565,6 +566,21 @@ func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
565
566
}
566
567
}
567
568
569
+ // serveWatch will serve a watch response according to the watcher and watchServer.
570
+ // Before watchServer.ServeHTTP, an error may occur like k8s.io/apiserver/pkg/endpoints/handlers/watch.go#serveWatch does.
571
+ func serveWatch (watcher watch.Interface , watchServer * handlers.WatchServer , preServeErr error ) http.HandlerFunc {
572
+ return func (w http.ResponseWriter , req * http.Request ) {
573
+ defer watcher .Stop ()
574
+
575
+ if preServeErr != nil {
576
+ responsewriters .ErrorNegotiated (preServeErr , watchServer .Scope .Serializer , watchServer .Scope .Kind .GroupVersion (), w , req )
577
+ return
578
+ }
579
+
580
+ watchServer .ServeHTTP (w , req )
581
+ }
582
+ }
583
+
568
584
func TestWatchHTTPErrors (t * testing.T ) {
569
585
watcher := watch .NewFake ()
570
586
timeoutCh := make (chan time.Time )
@@ -590,9 +606,7 @@ func TestWatchHTTPErrors(t *testing.T) {
590
606
TimeoutFactory : & fakeTimeoutFactory {timeoutCh , done },
591
607
}
592
608
593
- s := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
594
- watchServer .ServeHTTP (w , req )
595
- }))
609
+ s := httptest .NewServer (serveWatch (watcher , watchServer , nil ))
596
610
defer s .Close ()
597
611
598
612
// Setup a client
@@ -629,6 +643,68 @@ func TestWatchHTTPErrors(t *testing.T) {
629
643
}
630
644
}
631
645
646
+ func TestWatchHTTPErrorsBeforeServe (t * testing.T ) {
647
+ watcher := watch .NewFake ()
648
+ timeoutCh := make (chan time.Time )
649
+ done := make (chan struct {})
650
+
651
+ info , ok := runtime .SerializerInfoForMediaType (codecs .SupportedMediaTypes (), runtime .ContentTypeJSON )
652
+ if ! ok || info .StreamSerializer == nil {
653
+ t .Fatal (info )
654
+ }
655
+ serializer := info .StreamSerializer
656
+
657
+ // Setup a new watchserver
658
+ watchServer := & handlers.WatchServer {
659
+ Scope : & handlers.RequestScope {
660
+ Serializer : runtime .NewSimpleNegotiatedSerializer (info ),
661
+ Kind : testGroupVersion .WithKind ("test" ),
662
+ },
663
+ Watching : watcher ,
664
+
665
+ MediaType : "testcase/json" ,
666
+ Framer : serializer .Framer ,
667
+ Encoder : newCodec ,
668
+ EmbeddedEncoder : newCodec ,
669
+
670
+ Fixup : func (obj runtime.Object ) runtime.Object { return obj },
671
+ TimeoutFactory : & fakeTimeoutFactory {timeoutCh , done },
672
+ }
673
+
674
+ errStatus := errors .NewInternalError (fmt .Errorf ("we got an error" ))
675
+
676
+ s := httptest .NewServer (serveWatch (watcher , watchServer , errStatus ))
677
+ defer s .Close ()
678
+
679
+ // Setup a client
680
+ dest , _ := url .Parse (s .URL )
681
+ dest .Path = "/" + prefix + "/" + newGroupVersion .Group + "/" + newGroupVersion .Version + "/simple"
682
+ dest .RawQuery = "watch=true"
683
+
684
+ req , _ := http .NewRequest ("GET" , dest .String (), nil )
685
+ client := http.Client {}
686
+ resp , err := client .Do (req )
687
+ if err != nil {
688
+ t .Fatalf ("Unexpected error: %v" , err )
689
+ }
690
+
691
+ // We had already got an error before watch serve started
692
+ decoder := json .NewDecoder (resp .Body )
693
+ var status * metav1.Status
694
+ err = decoder .Decode (& status )
695
+ if err != nil {
696
+ t .Fatalf ("Unexpected error: %v" , err )
697
+ }
698
+ if status .Kind != "Status" || status .APIVersion != "v1" || status .Code != 500 || status .Status != "Failure" || ! strings .Contains (status .Message , "we got an error" ) {
699
+ t .Fatalf ("error: %#v" , status )
700
+ }
701
+
702
+ // check for leaks
703
+ if ! watcher .IsStopped () {
704
+ t .Errorf ("Leaked watcher goruntine after request done" )
705
+ }
706
+ }
707
+
632
708
func TestWatchHTTPDynamicClientErrors (t * testing.T ) {
633
709
watcher := watch .NewFake ()
634
710
timeoutCh := make (chan time.Time )
@@ -654,9 +730,7 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
654
730
TimeoutFactory : & fakeTimeoutFactory {timeoutCh , done },
655
731
}
656
732
657
- s := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
658
- watchServer .ServeHTTP (w , req )
659
- }))
733
+ s := httptest .NewServer (serveWatch (watcher , watchServer , nil ))
660
734
defer s .Close ()
661
735
defer s .CloseClientConnections ()
662
736
@@ -699,9 +773,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
699
773
TimeoutFactory : & fakeTimeoutFactory {timeoutCh , done },
700
774
}
701
775
702
- s := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
703
- watchServer .ServeHTTP (w , req )
704
- }))
776
+ s := httptest .NewServer (serveWatch (watcher , watchServer , nil ))
705
777
defer s .Close ()
706
778
707
779
// Setup a client
@@ -729,7 +801,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
729
801
close (timeoutCh )
730
802
select {
731
803
case <- done :
732
- if ! watcher .Stopped {
804
+ if ! watcher .IsStopped () {
733
805
t .Errorf ("Leaked watch on timeout" )
734
806
}
735
807
case <- time .After (wait .ForeverTestTimeout ):
0 commit comments