@@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) {
59
59
ns := "default"
60
60
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient (ns , dynamicClient , noxuDefinition , "v1beta1" )
61
61
62
- stopChan := make (chan struct {})
62
+ updateCRD := func () {
63
+ noxuDefinitionToUpdate , err := apiExtensionsClient .ApiextensionsV1 ().CustomResourceDefinitions ().Get (context .TODO (), noxuDefinition .Name , metav1.GetOptions {})
64
+ if err != nil {
65
+ t .Error (err )
66
+ return
67
+ }
68
+ if len (noxuDefinitionToUpdate .Spec .Versions ) == 1 {
69
+ v2 := noxuDefinitionToUpdate .Spec .Versions [0 ]
70
+ v2 .Name = "v2"
71
+ v2 .Served = true
72
+ v2 .Storage = false
73
+ noxuDefinitionToUpdate .Spec .Versions = append (noxuDefinitionToUpdate .Spec .Versions , v2 )
74
+ } else {
75
+ noxuDefinitionToUpdate .Spec .Versions = noxuDefinitionToUpdate .Spec .Versions [0 :1 ]
76
+ }
77
+ if _ , err := apiExtensionsClient .ApiextensionsV1 ().CustomResourceDefinitions ().Update (context .TODO (), noxuDefinitionToUpdate , metav1.UpdateOptions {}); err != nil && ! apierrors .IsConflict (err ) {
78
+ t .Error (err )
79
+ }
80
+ }
63
81
82
+ // Set up 10 watchers for custom resource.
83
+ // We can't exercise them in a loop the same way as get requests, as watchcache
84
+ // can reject them with 429 and Retry-After: 1 if it is uninitialized and even
85
+ // though 429 is automatically retried, with frequent watchcache terminations and
86
+ // reinitializations they could either end-up being rejected N times and fail or
87
+ // or not initialize until the last watchcache reinitialization and then not be
88
+ // terminated. Thus we exercise their termination explicitly at the beginning.
64
89
wg := & sync.WaitGroup {}
90
+ for i := 0 ; i < 10 ; i ++ {
91
+ wg .Add (1 )
92
+ go func (i int ) {
93
+ defer wg .Done ()
94
+
95
+ w , err := noxuNamespacedResourceClient .Watch (context .TODO (), metav1.ListOptions {})
96
+ if err != nil {
97
+ t .Errorf ("unexpected error establishing watch: %v" , err )
98
+ return
99
+ }
100
+ for event := range w .ResultChan () {
101
+ switch event .Type {
102
+ case watch .Added , watch .Modified , watch .Deleted :
103
+ // all expected
104
+ default :
105
+ t .Errorf ("unexpected watch event: %#v" , event )
106
+ }
107
+ }
108
+ }(i )
109
+ }
110
+
111
+ // Let all the established watches soak request loops soak
112
+ time .Sleep (5 * time .Second )
113
+
114
+ // Update CRD and ensure that all watches are gracefully terminated.
115
+ updateCRD ()
116
+
117
+ drained := make (chan struct {})
118
+ go func () {
119
+ defer close (drained )
120
+ wg .Wait ()
121
+ }()
122
+
123
+ select {
124
+ case <- drained :
125
+ case <- time .After (wait .ForeverTestTimeout ):
126
+ t .Fatal ("timed out waiting for watchers to be terminated" )
127
+ }
128
+
129
+ stopChan := make (chan struct {})
65
130
66
131
// Set up loop to modify CRD in the background
67
132
wg .Add (1 )
@@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) {
76
141
77
142
time .Sleep (10 * time .Millisecond )
78
143
79
- noxuDefinitionToUpdate , err := apiExtensionsClient .ApiextensionsV1 ().CustomResourceDefinitions ().Get (context .TODO (), noxuDefinition .Name , metav1.GetOptions {})
80
- if err != nil {
81
- t .Error (err )
82
- continue
83
- }
84
- if len (noxuDefinitionToUpdate .Spec .Versions ) == 1 {
85
- v2 := noxuDefinitionToUpdate .Spec .Versions [0 ]
86
- v2 .Name = "v2"
87
- v2 .Served = true
88
- v2 .Storage = false
89
- noxuDefinitionToUpdate .Spec .Versions = append (noxuDefinitionToUpdate .Spec .Versions , v2 )
90
- } else {
91
- noxuDefinitionToUpdate .Spec .Versions = noxuDefinitionToUpdate .Spec .Versions [0 :1 ]
92
- }
93
- if _ , err := apiExtensionsClient .ApiextensionsV1 ().CustomResourceDefinitions ().Update (context .TODO (), noxuDefinitionToUpdate , metav1.UpdateOptions {}); err != nil && ! apierrors .IsConflict (err ) {
94
- t .Error (err )
95
- continue
96
- }
144
+ updateCRD ()
97
145
}
98
146
}()
99
147
100
- // Set up 10 loops creating and reading and watching custom resources
148
+ // Set up 10 loops creating and reading custom resources
101
149
for i := 0 ; i < 10 ; i ++ {
102
150
wg .Add (1 )
103
151
go func (i int ) {
@@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) {
120
168
}
121
169
}
122
170
}(i )
123
-
124
- wg .Add (1 )
125
- go func (i int ) {
126
- defer wg .Done ()
127
- for {
128
- time .Sleep (10 * time .Millisecond )
129
- select {
130
- case <- stopChan :
131
- return
132
- default :
133
- w , err := noxuNamespacedResourceClient .Watch (context .TODO (), metav1.ListOptions {})
134
- if err != nil {
135
- t .Errorf ("unexpected error establishing watch: %v" , err )
136
- continue
137
- }
138
- for event := range w .ResultChan () {
139
- switch event .Type {
140
- case watch .Added , watch .Modified , watch .Deleted :
141
- // all expected
142
- default :
143
- t .Errorf ("unexpected watch event: %#v" , event )
144
- }
145
- }
146
- }
147
- }
148
- }(i )
149
171
}
150
172
151
173
// Let all the established get request loops soak
@@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) {
155
177
close (stopChan )
156
178
157
179
// Let loops drain
158
- drained : = make (chan struct {})
180
+ drained = make (chan struct {})
159
181
go func () {
160
182
defer close (drained )
161
183
wg .Wait ()
@@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) {
164
186
select {
165
187
case <- drained :
166
188
case <- time .After (wait .ForeverTestTimeout ):
167
- t .Error ("timed out waiting for clients to complete" )
189
+ t .Fatal ("timed out waiting for clients to complete" )
168
190
}
169
191
}
0 commit comments