@@ -30,9 +30,14 @@ import (
30
30
clientset "k8s.io/client-go/kubernetes"
31
31
restclient "k8s.io/client-go/rest"
32
32
"k8s.io/client-go/tools/cache"
33
+ basemetric "k8s.io/component-base/metrics"
34
+ metricstestutil "k8s.io/component-base/metrics/testutil"
33
35
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
36
+ "k8s.io/kubernetes/pkg/controller/podgc"
37
+ podgcmetrics "k8s.io/kubernetes/pkg/controller/podgc/metrics"
34
38
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
35
39
volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
40
+ "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
36
41
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
37
42
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
38
43
"k8s.io/kubernetes/pkg/volume"
@@ -137,6 +142,98 @@ var defaultTimerConfig = attachdetach.TimerConfig{
137
142
DesiredStateOfWorldPopulatorListPodsRetryDuration : 3 * time .Second ,
138
143
}
139
144
145
+ // TestPodTerminationWithNodeOOSDetach integration test verifies that if `out-of-service` taints is applied to the node
146
+ // Which is shutdown non gracefully, then all the pods will immediately get terminated and volume be immediately detached
147
+ // without waiting for the default timout period
148
+ func TestPodTerminationWithNodeOOSDetach (t * testing.T ) {
149
+ // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
150
+ server := kubeapiservertesting .StartTestServerOrDie (t , nil , framework .DefaultTestServerFlags (), framework .SharedEtcd ())
151
+ defer server .TearDownFn ()
152
+
153
+ tCtx := ktesting .Init (t )
154
+ defer tCtx .Cancel ("test has completed" )
155
+ testClient , ctrl , pvCtrl , podgcCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , attachdetach.TimerConfig {
156
+ ReconcilerLoopPeriod : 100 * time .Millisecond ,
157
+ ReconcilerMaxWaitForUnmountDuration : 6 * time .Second ,
158
+ DesiredStateOfWorldPopulatorLoopSleepPeriod : 24 * time .Hour ,
159
+ // Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test.
160
+ DesiredStateOfWorldPopulatorListPodsRetryDuration : 24 * time .Hour ,
161
+ })
162
+
163
+ namespaceName := "test-node-oos"
164
+ nodeName := "node-sandbox"
165
+ node := & v1.Node {
166
+ ObjectMeta : metav1.ObjectMeta {
167
+ Name : nodeName ,
168
+ Annotations : map [string ]string {
169
+ util .ControllerManagedAttachAnnotation : "true" ,
170
+ },
171
+ },
172
+ }
173
+ ns := framework .CreateNamespaceOrDie (testClient , namespaceName , t )
174
+ defer framework .DeleteNamespaceOrDie (testClient , ns , t )
175
+
176
+ _ , err := testClient .CoreV1 ().Nodes ().Create (context .TODO (), node , metav1.CreateOptions {})
177
+ if err != nil {
178
+ t .Fatalf ("Failed to created node : %v" , err )
179
+ }
180
+
181
+ pod := fakePodWithVol (namespaceName )
182
+ if _ , err := testClient .CoreV1 ().Pods (pod .Namespace ).Create (context .TODO (), pod , metav1.CreateOptions {}); err != nil {
183
+ t .Errorf ("Failed to create pod : %v" , err )
184
+ }
185
+
186
+ // start controller loop
187
+ podInformer := informers .Core ().V1 ().Pods ().Informer ()
188
+ informers .Start (tCtx .Done ())
189
+ informers .WaitForCacheSync (tCtx .Done ())
190
+ go ctrl .Run (tCtx )
191
+ go pvCtrl .Run (tCtx )
192
+ go podgcCtrl .Run (tCtx )
193
+
194
+ waitToObservePods (t , podInformer , 1 )
195
+ // wait for volume to be attached
196
+ waitForVolumeToBeAttached (tCtx , t , testClient , pod .Name , nodeName )
197
+
198
+ // Patch the node to mark the volume in use as attach-detach controller verifies if safe to detach the volume
199
+ // based on that.
200
+ node .Status .VolumesInUse = append (node .Status .VolumesInUse , "kubernetes.io/mock-provisioner/fake-mount" )
201
+ node , err = testClient .CoreV1 ().Nodes ().UpdateStatus (context .TODO (), node , metav1.UpdateOptions {})
202
+ if err != nil {
203
+ t .Fatalf ("error in patch volumeInUse status to nodes: %s" , err )
204
+ }
205
+ // Delete the pod with grace period time so that it is stuck in terminating state
206
+ gracePeriod := int64 (300 )
207
+ err = testClient .CoreV1 ().Pods (namespaceName ).Delete (context .TODO (), pod .Name , metav1.DeleteOptions {
208
+ GracePeriodSeconds : & gracePeriod ,
209
+ })
210
+ if err != nil {
211
+ t .Fatalf ("error in deleting pod: %v" , err )
212
+ }
213
+
214
+ // varify that DeletionTimestamp is not nil, means pod is in `Terminating` stsate
215
+ waitForPodDeletionTimestampToSet (tCtx , t , testClient , pod .Name , pod .Namespace )
216
+
217
+ // taint the node `out-of-service`
218
+ taint := v1.Taint {
219
+ Key : v1 .TaintNodeOutOfService ,
220
+ Effect : v1 .TaintEffectNoExecute ,
221
+ }
222
+ node .Spec .Taints = append (node .Spec .Taints , taint )
223
+ if _ , err := testClient .CoreV1 ().Nodes ().Update (context .TODO (), node , metav1.UpdateOptions {}); err != nil {
224
+ t .Fatalf ("error in patch oos taint to node: %v" , err )
225
+ }
226
+ waitForNodeToBeTainted (tCtx , t , testClient , v1 .TaintNodeOutOfService , nodeName )
227
+
228
+ // Verify if the pod was force deleted.
229
+ // When the node has out-of-service taint, and only if node is NotReady and pod is Terminating force delete will happen.
230
+ waitForMetric (tCtx , t , podgcmetrics .DeletingPodsTotal .WithLabelValues (namespaceName , podgcmetrics .PodGCReasonTerminatingOutOfService ), 1 , "terminating-pod-metric" )
231
+ // verify the volume was force detached
232
+ // Note: Metrics are accumulating
233
+ waitForMetric (tCtx , t , metrics .ForceDetachMetricCounter .WithLabelValues (metrics .ForceDetachReasonOutOfService ), 1 , "detach-metric" )
234
+
235
+ }
236
+
140
237
// Via integration test we can verify that if pod delete
141
238
// event is somehow missed by AttachDetach controller - it still
142
239
// gets cleaned up by Desired State of World populator.
@@ -157,7 +254,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
157
254
158
255
tCtx := ktesting .Init (t )
159
256
defer tCtx .Cancel ("test has completed" )
160
- testClient , ctrl , pvCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
257
+ testClient , ctrl , pvCtrl , podgcCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
161
258
162
259
ns := framework .CreateNamespaceOrDie (testClient , namespaceName , t )
163
260
defer framework .DeleteNamespaceOrDie (testClient , ns , t )
@@ -184,6 +281,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
184
281
go ctrl .Run (tCtx )
185
282
// Run pvCtrl to avoid leaking goroutines started during its creation.
186
283
go pvCtrl .Run (tCtx )
284
+ go podgcCtrl .Run (tCtx )
187
285
188
286
waitToObservePods (t , podInformer , 1 )
189
287
podKey , err := cache .MetaNamespaceKeyFunc (pod )
@@ -214,7 +312,7 @@ func initCSIObjects(stopCh <-chan struct{}, informers clientgoinformers.SharedIn
214
312
go informers .Storage ().V1 ().CSIDrivers ().Informer ().Run (stopCh )
215
313
}
216
314
217
- func TestPodUpdateWithWithADC (t * testing.T ) {
315
+ func TestPodUpdateWithADC (t * testing.T ) {
218
316
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
219
317
server := kubeapiservertesting .StartTestServerOrDie (t , nil , framework .DefaultTestServerFlags (), framework .SharedEtcd ())
220
318
defer server .TearDownFn ()
@@ -231,7 +329,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
231
329
232
330
tCtx := ktesting .Init (t )
233
331
defer tCtx .Cancel ("test has completed" )
234
- testClient , ctrl , pvCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
332
+ testClient , ctrl , pvCtrl , podgcCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
235
333
236
334
ns := framework .CreateNamespaceOrDie (testClient , namespaceName , t )
237
335
defer framework .DeleteNamespaceOrDie (testClient , ns , t )
@@ -261,6 +359,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
261
359
go ctrl .Run (tCtx )
262
360
// Run pvCtrl to avoid leaking goroutines started during its creation.
263
361
go pvCtrl .Run (tCtx )
362
+ go podgcCtrl .Run (tCtx )
264
363
265
364
waitToObservePods (t , podInformer , 1 )
266
365
podKey , err := cache .MetaNamespaceKeyFunc (pod )
@@ -326,7 +425,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
326
425
}
327
426
}
328
427
329
- func createAdClients (ctx context.Context , t * testing.T , server * kubeapiservertesting.TestServer , syncPeriod time.Duration , timers attachdetach.TimerConfig ) (* clientset.Clientset , attachdetach.AttachDetachController , * persistentvolume.PersistentVolumeController , clientgoinformers.SharedInformerFactory ) {
428
+ func createAdClients (ctx context.Context , t * testing.T , server * kubeapiservertesting.TestServer , syncPeriod time.Duration , timers attachdetach.TimerConfig ) (* clientset.Clientset , attachdetach.AttachDetachController , * persistentvolume.PersistentVolumeController , * podgc. PodGCController , clientgoinformers.SharedInformerFactory ) {
330
429
config := restclient .CopyConfig (server .ClientConfig )
331
430
config .QPS = 1000000
332
431
config .Burst = 1000000
@@ -383,11 +482,20 @@ func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertes
383
482
NodeInformer : informers .Core ().V1 ().Nodes (),
384
483
EnableDynamicProvisioning : false ,
385
484
}
485
+ podgcCtrl := podgc .NewPodGCInternal (
486
+ ctx ,
487
+ testClient ,
488
+ informers .Core ().V1 ().Pods (),
489
+ informers .Core ().V1 ().Nodes (),
490
+ 0 ,
491
+ 500 * time .Millisecond ,
492
+ time .Second ,
493
+ )
386
494
pvCtrl , err := persistentvolume .NewController (ctx , params )
387
495
if err != nil {
388
496
t .Fatalf ("Failed to create PV controller: %v" , err )
389
497
}
390
- return testClient , ctrl , pvCtrl , informers
498
+ return testClient , ctrl , pvCtrl , podgcCtrl , informers
391
499
}
392
500
393
501
// Via integration test we can verify that if pod add
@@ -410,7 +518,7 @@ func TestPodAddedByDswp(t *testing.T) {
410
518
411
519
tCtx := ktesting .Init (t )
412
520
defer tCtx .Cancel ("test has completed" )
413
- testClient , ctrl , pvCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
521
+ testClient , ctrl , pvCtrl , podgcCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , defaultTimerConfig )
414
522
415
523
ns := framework .CreateNamespaceOrDie (testClient , namespaceName , t )
416
524
defer framework .DeleteNamespaceOrDie (testClient , ns , t )
@@ -439,6 +547,7 @@ func TestPodAddedByDswp(t *testing.T) {
439
547
go ctrl .Run (tCtx )
440
548
// Run pvCtrl to avoid leaking goroutines started during its creation.
441
549
go pvCtrl .Run (tCtx )
550
+ go podgcCtrl .Run (tCtx )
442
551
443
552
waitToObservePods (t , podInformer , 1 )
444
553
podKey , err := cache .MetaNamespaceKeyFunc (pod )
@@ -480,7 +589,7 @@ func TestPVCBoundWithADC(t *testing.T) {
480
589
481
590
namespaceName := "test-pod-deletion"
482
591
483
- testClient , ctrl , pvCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , attachdetach.TimerConfig {
592
+ testClient , ctrl , pvCtrl , podgcCtrl , informers := createAdClients (tCtx , t , server , defaultSyncPeriod , attachdetach.TimerConfig {
484
593
ReconcilerLoopPeriod : 100 * time .Millisecond ,
485
594
ReconcilerMaxWaitForUnmountDuration : 6 * time .Second ,
486
595
DesiredStateOfWorldPopulatorLoopSleepPeriod : 24 * time .Hour ,
@@ -528,6 +637,7 @@ func TestPVCBoundWithADC(t *testing.T) {
528
637
initCSIObjects (tCtx .Done (), informers )
529
638
go ctrl .Run (tCtx )
530
639
go pvCtrl .Run (tCtx )
640
+ go podgcCtrl .Run (tCtx )
531
641
532
642
waitToObservePods (t , informers .Core ().V1 ().Pods ().Informer (), 4 )
533
643
// Give attachdetach controller enough time to populate pods into DSWP.
@@ -561,3 +671,70 @@ func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.Persi
561
671
t .Errorf ("Failed to create pv : %v" , err )
562
672
}
563
673
}
674
+
675
+ // Wait for DeletionTimestamp added to pod
676
+ func waitForPodDeletionTimestampToSet (tCtx context.Context , t * testing.T , testingClient * clientset.Clientset , podName , podNamespace string ) {
677
+ if err := wait .PollUntilContextCancel (tCtx , 100 * time .Millisecond , false , func (context.Context ) (bool , error ) {
678
+ pod , err := testingClient .CoreV1 ().Pods (podNamespace ).Get (context .TODO (), podName , metav1.GetOptions {})
679
+ if err != nil {
680
+ t .Fatal (err )
681
+ }
682
+ if pod .DeletionTimestamp != nil {
683
+ return true , nil
684
+ }
685
+ return false , nil
686
+ }); err != nil {
687
+ t .Fatalf ("Failed to get deletionTimestamp of pod: %s, namespace: %s" , podName , podNamespace )
688
+ }
689
+ }
690
+
691
+ // Wait for VolumeAttach added to node
692
+ func waitForVolumeToBeAttached (ctx context.Context , t * testing.T , testingClient * clientset.Clientset , podName , nodeName string ) {
693
+ if err := wait .PollUntilContextTimeout (ctx , 100 * time .Millisecond , 120 * time .Second , false , func (context.Context ) (bool , error ) {
694
+ node , err := testingClient .CoreV1 ().Nodes ().Get (context .TODO (), nodeName , metav1.GetOptions {})
695
+ if len (node .Status .VolumesAttached ) >= 1 {
696
+ return true , nil
697
+ }
698
+ if err != nil {
699
+ t .Fatalf ("Failed to get the node : %v" , err )
700
+ }
701
+ return false , nil
702
+ }); err != nil {
703
+ t .Fatalf ("Failed to attach volume to pod: %s for node: %s" , podName , nodeName )
704
+ }
705
+ }
706
+
707
+ // Wait for taint added to node
708
+ func waitForNodeToBeTainted (ctx context.Context , t * testing.T , testingClient * clientset.Clientset , taintKey , nodeName string ) {
709
+ if err := wait .PollUntilContextTimeout (ctx , 100 * time .Millisecond , 60 * time .Second , false , func (context.Context ) (bool , error ) {
710
+ node , err := testingClient .CoreV1 ().Nodes ().Get (context .TODO (), nodeName , metav1.GetOptions {})
711
+ if err != nil {
712
+ t .Fatal (err )
713
+ }
714
+ for _ , taint := range node .Spec .Taints {
715
+ if taint .Key == taintKey {
716
+ return true , nil
717
+ }
718
+ }
719
+ return false , nil
720
+ }); err != nil {
721
+ t .Fatalf ("Failed to add taint: %s to node: %s" , taintKey , nodeName )
722
+ }
723
+ }
724
+
725
+ func waitForMetric (ctx context.Context , t * testing.T , m basemetric.CounterMetric , expectedCount float64 , identifier string ) {
726
+ if err := wait .PollUntilContextTimeout (ctx , 100 * time .Millisecond , 60 * time .Second , false , func (ctx context.Context ) (done bool , err error ) {
727
+ gotCount , err := metricstestutil .GetCounterMetricValue (m )
728
+ if err != nil {
729
+ t .Fatal (err , identifier )
730
+ }
731
+ t .Logf ("expected metric count %g but got %g for %s" , expectedCount , gotCount , identifier )
732
+ // As metrics are global, this condition ( >= ) is applied, just to check the minimum expectation.
733
+ if gotCount >= expectedCount {
734
+ return true , nil
735
+ }
736
+ return false , nil
737
+ }); err != nil {
738
+ t .Fatalf ("Failed to match the count of metrics to expect: %v" , expectedCount )
739
+ }
740
+ }
0 commit comments