@@ -24,6 +24,7 @@ import (
24
24
"reflect"
25
25
"strings"
26
26
"sync"
27
+ "sync/atomic"
27
28
"testing"
28
29
"time"
29
30
@@ -49,6 +50,7 @@ import (
49
50
"k8s.io/apimachinery/pkg/util/json"
50
51
"k8s.io/apimachinery/pkg/util/sets"
51
52
"k8s.io/apimachinery/pkg/util/strategicpatch"
53
+ "k8s.io/apimachinery/pkg/util/wait"
52
54
"k8s.io/client-go/discovery"
53
55
"k8s.io/client-go/informers"
54
56
"k8s.io/client-go/kubernetes"
@@ -60,9 +62,11 @@ import (
60
62
clientgotesting "k8s.io/client-go/testing"
61
63
"k8s.io/client-go/tools/record"
62
64
"k8s.io/client-go/util/workqueue"
65
+ metricsutil "k8s.io/component-base/metrics/testutil"
63
66
"k8s.io/controller-manager/pkg/informerfactory"
64
67
"k8s.io/kubernetes/pkg/api/legacyscheme"
65
68
c "k8s.io/kubernetes/pkg/controller"
69
+ "k8s.io/kubernetes/pkg/controller/garbagecollector/metrics"
66
70
"k8s.io/kubernetes/test/utils/ktesting"
67
71
)
68
72
@@ -846,7 +850,6 @@ func TestGarbageCollectorSync(t *testing.T) {
846
850
PreferredResources : serverResources ,
847
851
Error : nil ,
848
852
Lock : sync.Mutex {},
849
- InterfaceUsedCount : 0 ,
850
853
}
851
854
852
855
testHandler := & fakeActionHandler {
@@ -865,7 +868,24 @@ func TestGarbageCollectorSync(t *testing.T) {
865
868
},
866
869
},
867
870
}
868
- srv , clientConfig := testServerAndClientConfig (testHandler .ServeHTTP )
871
+
872
+ testHandler2 := & fakeActionHandler {
873
+ response : map [string ]FakeResponse {
874
+ "GET" + "/api/v1/secrets" : {
875
+ 200 ,
876
+ []byte ("{}" ),
877
+ },
878
+ },
879
+ }
880
+ var secretSyncOK atomic.Bool
881
+ var alternativeTestHandler = func (response http.ResponseWriter , request * http.Request ) {
882
+ if request .URL .Path == "/api/v1/secrets" && secretSyncOK .Load () {
883
+ testHandler2 .ServeHTTP (response , request )
884
+ return
885
+ }
886
+ testHandler .ServeHTTP (response , request )
887
+ }
888
+ srv , clientConfig := testServerAndClientConfig (alternativeTestHandler )
869
889
defer srv .Close ()
870
890
clientConfig .ContentConfig .NegotiatedSerializer = nil
871
891
client , err := kubernetes .NewForConfig (clientConfig )
@@ -885,7 +905,7 @@ func TestGarbageCollectorSync(t *testing.T) {
885
905
886
906
sharedInformers := informers .NewSharedInformerFactory (client , 0 )
887
907
888
- tCtx := ktesting .Init (t )
908
+ logger , tCtx := ktesting .NewTestContext (t )
889
909
defer tCtx .Cancel ("test has completed" )
890
910
alwaysStarted := make (chan struct {})
891
911
close (alwaysStarted )
@@ -913,30 +933,49 @@ func TestGarbageCollectorSync(t *testing.T) {
913
933
914
934
// Wait until the sync discovers the initial resources
915
935
time .Sleep (1 * time .Second )
936
+
937
+ err = expectSyncNotBlocked (fakeDiscoveryClient )
938
+ if err != nil {
939
+ t .Fatalf ("Expected garbagecollector.Sync to still be running but it is blocked: %v" , err )
940
+ }
916
941
assertMonitors (t , gc , "pods" , "deployments" )
917
942
918
943
// Simulate the discovery client returning an error
919
944
fakeDiscoveryClient .setPreferredResources (nil , fmt .Errorf ("error calling discoveryClient.ServerPreferredResources()" ))
945
+
946
+ // Wait until sync discovers the change
920
947
time .Sleep (1 * time .Second )
948
+ // No monitor changes
921
949
assertMonitors (t , gc , "pods" , "deployments" )
922
950
923
951
// Remove the error from being returned and see if the garbage collector sync is still working
924
952
fakeDiscoveryClient .setPreferredResources (serverResources , nil )
925
- time .Sleep (1 * time .Second )
953
+
954
+ err = expectSyncNotBlocked (fakeDiscoveryClient )
955
+ if err != nil {
956
+ t .Fatalf ("Expected garbagecollector.Sync to still be running but it is blocked: %v" , err )
957
+ }
926
958
assertMonitors (t , gc , "pods" , "deployments" )
927
959
928
960
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
929
961
fakeDiscoveryClient .setPreferredResources (unsyncableServerResources , nil )
962
+
963
+ // Wait until sync discovers the change
930
964
time .Sleep (1 * time .Second )
931
965
assertMonitors (t , gc , "pods" , "secrets" )
932
966
933
967
// Put the resources back to normal and ensure garbage collector sync recovers
934
968
fakeDiscoveryClient .setPreferredResources (serverResources , nil )
935
- time .Sleep (1 * time .Second )
969
+
970
+ err = expectSyncNotBlocked (fakeDiscoveryClient )
971
+ if err != nil {
972
+ t .Fatalf ("Expected garbagecollector.Sync to still be running but it is blocked: %v" , err )
973
+ }
936
974
assertMonitors (t , gc , "pods" , "deployments" )
937
975
938
976
// Partial discovery failure
939
977
fakeDiscoveryClient .setPreferredResources (unsyncableServerResources , appsV1Error )
978
+ // Wait until sync discovers the change
940
979
time .Sleep (1 * time .Second )
941
980
// Deployments monitor kept
942
981
assertMonitors (t , gc , "pods" , "deployments" , "secrets" )
@@ -945,35 +984,33 @@ func TestGarbageCollectorSync(t *testing.T) {
945
984
fakeDiscoveryClient .setPreferredResources (serverResources , nil )
946
985
// Wait until sync discovers the change
947
986
time .Sleep (1 * time .Second )
987
+ err = expectSyncNotBlocked (fakeDiscoveryClient )
988
+ if err != nil {
989
+ t .Fatalf ("Expected garbagecollector.Sync to still be running but it is blocked: %v" , err )
990
+ }
948
991
// Unsyncable monitor removed
949
992
assertMonitors (t , gc , "pods" , "deployments" )
950
993
951
- // Add fake controller simulate the initial not-synced informer which will be synced at the end.
952
- fc := fakeController {}
953
- gc .dependencyGraphBuilder .monitors [schema.GroupVersionResource {
954
- Version : "v1" ,
955
- Resource : "secrets" ,
956
- }] = & monitor {controller : & fc }
957
- if gc .IsSynced (logger ) {
958
- t .Fatal ("cache from garbage collector should not be synced" )
959
- }
960
-
994
+ // Simulate initial not-synced informer which will be synced at the end.
995
+ metrics .GarbageCollectorResourcesSyncError .Reset ()
961
996
fakeDiscoveryClient .setPreferredResources (unsyncableServerResources , nil )
962
997
time .Sleep (1 * time .Second )
963
998
assertMonitors (t , gc , "pods" , "secrets" )
999
+ if gc .IsSynced (logger ) {
1000
+ t .Fatal ("cache from garbage collector should not be synced" )
1001
+ }
1002
+ val , _ := metricsutil .GetCounterMetricValue (metrics .GarbageCollectorResourcesSyncError )
1003
+ if val < 1 {
1004
+ t .Fatalf ("expect sync error metric > 0" )
1005
+ }
964
1006
965
1007
// The informer is synced now.
966
- fc .SetSynced (true )
967
- time .Sleep (1 * time .Second )
968
- assertMonitors (t , gc , "pods" , "secrets" )
969
-
970
- if ! gc .IsSynced (logger ) {
971
- t .Fatal ("cache from garbage collector should be synced" )
1008
+ secretSyncOK .Store (true )
1009
+ if err := wait .PollUntilContextTimeout (tCtx , time .Second , wait .ForeverTestTimeout , true , func (ctx context.Context ) (bool , error ) {
1010
+ return gc .IsSynced (logger ), nil
1011
+ }); err != nil {
1012
+ t .Fatal (err )
972
1013
}
973
-
974
- fakeDiscoveryClient .setPreferredResources (serverResources , nil )
975
- time .Sleep (1 * time .Second )
976
- assertMonitors (t , gc , "pods" , "deployments" )
977
1014
}
978
1015
979
1016
func assertMonitors (t * testing.T , gc * GarbageCollector , resources ... string ) {
@@ -988,6 +1025,17 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
988
1025
}
989
1026
}
990
1027
1028
+ func expectSyncNotBlocked (fakeDiscoveryClient * fakeServerResources ) error {
1029
+ before := fakeDiscoveryClient .getInterfaceUsedCount ()
1030
+ t := 1 * time .Second
1031
+ time .Sleep (t )
1032
+ after := fakeDiscoveryClient .getInterfaceUsedCount ()
1033
+ if before == after {
1034
+ return fmt .Errorf ("discoveryClient.ServerPreferredResources() not called over %v" , t )
1035
+ }
1036
+ return nil
1037
+ }
1038
+
991
1039
type fakeServerResources struct {
992
1040
PreferredResources []* metav1.APIResourceList
993
1041
Error error
@@ -1017,6 +1065,12 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou
1017
1065
f .Error = err
1018
1066
}
1019
1067
1068
+ func (f * fakeServerResources ) getInterfaceUsedCount () int {
1069
+ f .Lock .Lock ()
1070
+ defer f .Lock .Unlock ()
1071
+ return f .InterfaceUsedCount
1072
+ }
1073
+
1020
1074
func (* fakeServerResources ) ServerPreferredNamespacedResources () ([]* metav1.APIResourceList , error ) {
1021
1075
return nil , nil
1022
1076
}
@@ -2754,28 +2808,6 @@ func assertState(s state) step {
2754
2808
2755
2809
}
2756
2810
2757
- type fakeController struct {
2758
- synced bool
2759
- lock sync.Mutex
2760
- }
2761
-
2762
- func (f * fakeController ) Run (stopCh <- chan struct {}) {
2763
- }
2764
-
2765
- func (f * fakeController ) HasSynced () bool {
2766
- return f .synced
2767
- }
2768
-
2769
- func (f * fakeController ) SetSynced (synced bool ) {
2770
- f .lock .Lock ()
2771
- defer f .lock .Unlock ()
2772
- f .synced = synced
2773
- }
2774
-
2775
- func (f * fakeController ) LastSyncResourceVersion () string {
2776
- return ""
2777
- }
2778
-
2779
2811
// trackingWorkqueue implements RateLimitingInterface,
2780
2812
// allows introspection of the items in the queue,
2781
2813
// and treats AddAfter and AddRateLimited the same as Add
0 commit comments