@@ -25,6 +25,7 @@ import (
25
25
"strings"
26
26
"time"
27
27
28
+ "google.golang.org/grpc/codes"
28
29
v1 "k8s.io/api/core/v1"
29
30
storagev1 "k8s.io/api/storage/v1"
30
31
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -54,8 +55,30 @@ const (
54
55
csiResizeWaitPeriod = 5 * time .Minute
55
56
// how long to wait for Resizing Condition on PVC to appear
56
57
csiResizingConditionWait = 2 * time .Minute
58
+
59
+ // How log to wait for kubelet to unstage a volume after a pod is deleted
60
+ csiUnstageWaitTimeout = 1 * time .Minute
61
+
62
+ // Name of CSI driver pod name (it's in a StatefulSet with a stable name)
63
+ driverPodName = "csi-mockplugin-0"
64
+ // Name of CSI driver container name
65
+ driverContainerName = "mock"
57
66
)
58
67
68
+ // csiCall represents an expected call from Kubernetes to CSI mock driver and
69
+ // expected return value.
70
+ // When matching expected csiCall with a real CSI mock driver output, one csiCall
71
+ // matches *one or more* calls with the same method and error code.
72
+ // This is due to exponential backoff in Kubernetes, where the test cannot expect
73
+ // exact number of call repetitions.
74
+ type csiCall struct {
75
+ expectedMethod string
76
+ expectedError codes.Code
77
+ // This is a mark for the test itself to delete the tested pod *after*
78
+ // this csiCall is received.
79
+ deletePod bool
80
+ }
81
+
59
82
var _ = utils .SIGDescribe ("CSI mock volume" , func () {
60
83
type testParameters struct {
61
84
disableAttach bool
@@ -67,6 +90,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
67
90
enableNodeExpansion bool // enable node expansion for CSI mock driver
68
91
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
69
92
disableResizingOnDriver bool
93
+ javascriptHooks map [string ]string
70
94
}
71
95
72
96
type mockDriverSetup struct {
@@ -100,6 +124,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
100
124
DisableAttach : tp .disableAttach ,
101
125
EnableResizing : tp .enableResizing ,
102
126
EnableNodeExpansion : tp .enableNodeExpansion ,
127
+ JavascriptHooks : tp .javascriptHooks ,
103
128
}
104
129
105
130
// this just disable resizing on driver, keeping resizing on SC enabled.
@@ -344,9 +369,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
344
369
framework .ExpectNoError (err , "while deleting" )
345
370
346
371
ginkgo .By ("Checking CSI driver logs" )
347
- // The driver is deployed as a statefulset with stable pod names
348
- driverPodName := "csi-mockplugin-0"
349
- err = checkPodLogs (m .cs , f .Namespace .Name , driverPodName , "mock" , pod , test .expectPodInfo , test .expectEphemeral , csiInlineVolumesEnabled )
372
+ err = checkPodLogs (m .cs , f .Namespace .Name , driverPodName , driverContainerName , pod , test .expectPodInfo , test .expectEphemeral , csiInlineVolumesEnabled )
350
373
framework .ExpectNoError (err )
351
374
})
352
375
}
@@ -558,6 +581,155 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
558
581
}
559
582
})
560
583
584
+ ginkgo .Context ("CSI NodeStage error cases [Slow]" , func () {
585
+ // Global variable in all scripts (called before each test)
586
+ globalScript := `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`
587
+ trackedCalls := []string {
588
+ "NodeStageVolume" ,
589
+ "NodeUnstageVolume" ,
590
+ }
591
+
592
+ tests := []struct {
593
+ name string
594
+ expectPodRunning bool
595
+ expectedCalls []csiCall
596
+ nodeStageScript string
597
+ nodeUnstageScript string
598
+ }{
599
+ {
600
+ // This is already tested elsewhere, adding simple good case here to test the test framework.
601
+ name : "should call NodeUnstage after NodeStage success" ,
602
+ expectPodRunning : true ,
603
+ expectedCalls : []csiCall {
604
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .OK , deletePod : true },
605
+ {expectedMethod : "NodeUnstageVolume" , expectedError : codes .OK },
606
+ },
607
+ nodeStageScript : `OK;` ,
608
+ },
609
+ {
610
+ // Kubelet should repeat NodeStage as long as the pod exists
611
+ name : "should retry NodeStage after NodeStage final error" ,
612
+ expectPodRunning : true ,
613
+ expectedCalls : []csiCall {
614
+ // This matches all 3 NodeStage calls with InvalidArgument error
615
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .InvalidArgument },
616
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .OK , deletePod : true },
617
+ {expectedMethod : "NodeUnstageVolume" , expectedError : codes .OK },
618
+ },
619
+ // Fail first 3 NodeStage requests, 4th succeeds
620
+ nodeStageScript : `console.log("Counter:", ++counter); if (counter < 4) { INVALIDARGUMENT; } else { OK; }` ,
621
+ },
622
+ {
623
+ // Kubelet should repeat NodeStage as long as the pod exists
624
+ name : "should retry NodeStage after NodeStage ephemeral error" ,
625
+ expectPodRunning : true ,
626
+ expectedCalls : []csiCall {
627
+ // This matches all 3 NodeStage calls with DeadlineExceeded error
628
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .DeadlineExceeded },
629
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .OK , deletePod : true },
630
+ {expectedMethod : "NodeUnstageVolume" , expectedError : codes .OK },
631
+ },
632
+ // Fail first 3 NodeStage requests, 4th succeeds
633
+ nodeStageScript : `console.log("Counter:", ++counter); if (counter < 4) { DEADLINEEXCEEDED; } else { OK; }` ,
634
+ },
635
+ {
636
+ // After NodeUnstage with ephemeral error, the driver may continue staging the volume.
637
+ // Kubelet should call NodeUnstage to make sure the volume is really unstaged after
638
+ // the pod is deleted.
639
+ name : "should call NodeUnstage after NodeStage ephemeral error" ,
640
+ expectPodRunning : false ,
641
+ expectedCalls : []csiCall {
642
+ // Delete the pod before NodeStage succeeds - it should get "uncertain" because of ephemeral error
643
+ // This matches all repeated NodeStage calls with DeadlineExceeded error (due to exp. backoff).
644
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .DeadlineExceeded , deletePod : true },
645
+ {expectedMethod : "NodeUnstageVolume" , expectedError : codes .OK },
646
+ },
647
+ nodeStageScript : `DEADLINEEXCEEDED;` ,
648
+ },
649
+ {
650
+ // After NodeUnstage with final error, kubelet can be sure the volume is not staged.
651
+ // The test checks that NodeUnstage is *not* called.
652
+ name : "should not call NodeUnstage after NodeStage final error" ,
653
+ expectPodRunning : false ,
654
+ expectedCalls : []csiCall {
655
+ // Delete the pod before NodeStage succeeds - it should get "globally unmounted" because of final error.
656
+ // This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff).
657
+ {expectedMethod : "NodeStageVolume" , expectedError : codes .InvalidArgument , deletePod : true },
658
+ },
659
+ nodeStageScript : `INVALIDARGUMENT;` ,
660
+ },
661
+ }
662
+ for _ , t := range tests {
663
+ test := t
664
+ ginkgo .It (test .name , func () {
665
+ scripts := map [string ]string {
666
+ "globals" : globalScript ,
667
+ "nodeStageVolumeStart" : test .nodeStageScript ,
668
+ "nodeUnstageVolumeStart" : test .nodeUnstageScript ,
669
+ }
670
+ init (testParameters {
671
+ disableAttach : true ,
672
+ registerDriver : true ,
673
+ scName : "csi-mock-sc-" + f .UniqueName ,
674
+ javascriptHooks : scripts ,
675
+ })
676
+ defer cleanup ()
677
+
678
+ _ , claim , pod := createPod (false )
679
+ if pod == nil {
680
+ return
681
+ }
682
+ // Wait for PVC to get bound to make sure the CSI driver is fully started.
683
+ err := e2epv .WaitForPersistentVolumeClaimPhase (v1 .ClaimBound , f .ClientSet , f .Namespace .Name , claim .Name , time .Second , framework .ClaimProvisionTimeout )
684
+ framework .ExpectNoError (err , "while waiting for PVC to get provisioned" )
685
+
686
+ ginkgo .By ("Waiting for expected CSI calls" )
687
+ // Watch for all calls up to deletePod = true
688
+ for {
689
+ time .Sleep (1 * time .Second )
690
+ index , err := compareCSICalls (trackedCalls , test .expectedCalls , m .cs , f .Namespace .Name , driverPodName , driverContainerName )
691
+ framework .ExpectNoError (err , "while waiting for initial CSI calls" )
692
+ if index == 0 {
693
+ // No CSI call received yet
694
+ continue
695
+ }
696
+ // Check the last *received* call wanted the pod to be deleted
697
+ if test .expectedCalls [index - 1 ].deletePod {
698
+ break
699
+ }
700
+ }
701
+
702
+ if test .expectPodRunning {
703
+ ginkgo .By ("Waiting for pod to be running" )
704
+ err := e2epod .WaitForPodNameRunningInNamespace (m .cs , pod .Name , pod .Namespace )
705
+ framework .ExpectNoError (err , "Failed to start pod: %v" , err )
706
+ }
707
+
708
+ ginkgo .By ("Deleting the previously created pod" )
709
+ err = e2epod .DeletePodWithWait (m .cs , pod )
710
+ framework .ExpectNoError (err , "while deleting" )
711
+
712
+ ginkgo .By ("Waiting for all remaining expected CSI calls" )
713
+ err = wait .Poll (time .Second , csiUnstageWaitTimeout , func () (done bool , err error ) {
714
+ index , err := compareCSICalls (trackedCalls , test .expectedCalls , m .cs , f .Namespace .Name , driverPodName , driverContainerName )
715
+ if err != nil {
716
+ return true , fmt .Errorf ("error waiting for expected CSI calls: %s" , err )
717
+ }
718
+ if index == 0 {
719
+ // No CSI call received yet
720
+ return false , nil
721
+ }
722
+ if len (test .expectedCalls ) == index {
723
+ // all calls received
724
+ return true , nil
725
+ }
726
+ return false , nil
727
+ })
728
+ framework .ExpectNoError (err , "while waiting for all CSI calls" )
729
+ })
730
+ }
731
+ })
732
+
561
733
})
562
734
563
735
func waitForMaxVolumeCondition (pod * v1.Pod , cs clientset.Interface ) error {
@@ -687,6 +859,18 @@ func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.Volum
687
859
return cs .CoreV1 ().Pods (ns ).Create (context .TODO (), pod , metav1.CreateOptions {})
688
860
}
689
861
862
+ // Dummy structure that parses just volume_attributes and error code out of logged CSI call
863
+ type mockCSICall struct {
864
+ Method string
865
+ Request struct {
866
+ VolumeContext map [string ]string `json:"volume_context"`
867
+ }
868
+ FullError struct {
869
+ Code codes.Code `json:"code"`
870
+ Message string `json:"message"`
871
+ }
872
+ }
873
+
690
874
// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes)
691
875
// has the matching NodeUnpublish
692
876
func checkPodLogs (cs clientset.Interface , namespace , driverPodName , driverContainerName string , pod * v1.Pod , expectPodInfo , ephemeralVolume , csiInlineVolumesEnabled bool ) error {
@@ -709,29 +893,15 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
709
893
framework .Logf ("CSI driver logs:\n %s" , log )
710
894
// Find NodePublish in the logs
711
895
foundAttributes := sets .NewString ()
712
- logLines := strings .Split (log , "\n " )
713
896
numNodePublishVolume := 0
714
897
numNodeUnpublishVolume := 0
715
- for _ , line := range logLines {
716
- if ! strings .HasPrefix (line , "gRPCCall:" ) {
717
- continue
718
- }
719
- line = strings .TrimPrefix (line , "gRPCCall:" )
720
- // Dummy structure that parses just volume_attributes out of logged CSI call
721
- type MockCSICall struct {
722
- Method string
723
- Request struct {
724
- VolumeContext map [string ]string `json:"volume_context"`
725
- }
726
- }
727
- var call MockCSICall
728
- err := json .Unmarshal ([]byte (line ), & call )
729
- if err != nil {
730
- framework .Logf ("Could not parse CSI driver log line %q: %s" , line , err )
731
- continue
732
- }
898
+ calls , err := parseMockLogs (cs , namespace , driverPodName , driverContainerName )
899
+ if err != nil {
900
+ return err
901
+ }
902
+ for _ , call := range calls {
733
903
switch call .Method {
734
- case "/csi.v1.Node/ NodePublishVolume" :
904
+ case "NodePublishVolume" :
735
905
numNodePublishVolume ++
736
906
if numNodePublishVolume == 1 {
737
907
// Check that NodePublish had expected attributes for first volume
@@ -743,7 +913,7 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
743
913
}
744
914
}
745
915
}
746
- case "/csi.v1.Node/ NodeUnpublishVolume" :
916
+ case "NodeUnpublishVolume" :
747
917
framework .Logf ("Found NodeUnpublishVolume: %+v" , call )
748
918
numNodeUnpublishVolume ++
749
919
}
@@ -768,6 +938,88 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
768
938
return nil
769
939
}
770
940
941
+ func parseMockLogs (cs clientset.Interface , namespace , driverPodName , driverContainerName string ) ([]mockCSICall , error ) {
942
+ // Load logs of driver pod
943
+ log , err := e2epod .GetPodLogs (cs , namespace , driverPodName , driverContainerName )
944
+ if err != nil {
945
+ return nil , fmt .Errorf ("could not load CSI driver logs: %s" , err )
946
+ }
947
+ framework .Logf ("CSI driver logs:\n %s" , log )
948
+
949
+ logLines := strings .Split (log , "\n " )
950
+ var calls []mockCSICall
951
+ for _ , line := range logLines {
952
+ if ! strings .HasPrefix (line , "gRPCCall:" ) {
953
+ continue
954
+ }
955
+ line = strings .TrimPrefix (line , "gRPCCall:" )
956
+ var call mockCSICall
957
+ err := json .Unmarshal ([]byte (line ), & call )
958
+ if err != nil {
959
+ framework .Logf ("Could not parse CSI driver log line %q: %s" , line , err )
960
+ continue
961
+ }
962
+
963
+ // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
964
+ methodParts := strings .Split (call .Method , "/" )
965
+ call .Method = methodParts [len (methodParts )- 1 ]
966
+
967
+ calls = append (calls , call )
968
+ }
969
+ return calls , nil
970
+ }
971
+
972
+ // compareCSICalls compares expectedCalls with logs of the mock driver.
973
+ // It returns index of the first expectedCall that was *not* received
974
+ // yet or error when calls do not match.
975
+ // All repeated calls to the CSI mock driver (e.g. due to exponential backoff)
976
+ // are squashed and checked against single expectedCallSequence item.
977
+ //
978
+ // Only permanent errors are returned. Other errors are logged and no
979
+ // calls are returned. The caller is expected to retry.
980
+ func compareCSICalls (trackedCalls []string , expectedCallSequence []csiCall , cs clientset.Interface , namespace , driverPodName , driverContainerName string ) (int , error ) {
981
+ allCalls , err := parseMockLogs (cs , namespace , driverPodName , driverContainerName )
982
+ if err != nil {
983
+ framework .Logf ("intermittent (?) log retrieval error, proceeding without output: %v" , err )
984
+ return 0 , nil
985
+ }
986
+
987
+ // Remove all repeated and ignored calls
988
+ tracked := sets .NewString (trackedCalls ... )
989
+ var calls []mockCSICall
990
+ var last mockCSICall
991
+ for _ , c := range allCalls {
992
+ if ! tracked .Has (c .Method ) {
993
+ continue
994
+ }
995
+ if c .Method != last .Method || c .FullError .Code != last .FullError .Code {
996
+ last = c
997
+ calls = append (calls , c )
998
+ }
999
+ // This call is the same as the last one, ignore it.
1000
+ }
1001
+
1002
+ for i , c := range calls {
1003
+ if i >= len (expectedCallSequence ) {
1004
+ // Log all unexpected calls first, return error below outside the loop.
1005
+ framework .Logf ("Unexpected CSI driver call: %s (%d)" , c .Method , c .FullError )
1006
+ continue
1007
+ }
1008
+
1009
+ // Compare current call with expected call
1010
+ expectedCall := expectedCallSequence [i ]
1011
+ if c .Method != expectedCall .expectedMethod || c .FullError .Code != expectedCall .expectedError {
1012
+ return i , fmt .Errorf ("Unexpected CSI call %d: expected %s (%d), got %s (%d)" , i , expectedCall .expectedMethod , expectedCall .expectedError , c .Method , c .FullError .Code )
1013
+ }
1014
+ }
1015
+ if len (calls ) > len (expectedCallSequence ) {
1016
+ return len (expectedCallSequence ), fmt .Errorf ("Received %d unexpected CSI driver calls" , len (calls )- len (expectedCallSequence ))
1017
+ }
1018
+ // All calls were correct
1019
+ return len (calls ), nil
1020
+
1021
+ }
1022
+
771
1023
func waitForCSIDriver (cs clientset.Interface , driverName string ) error {
772
1024
timeout := 4 * time .Minute
773
1025
0 commit comments