@@ -17,15 +17,14 @@ limitations under the License.
17
17
package csi
18
18
19
19
import (
20
+ "context"
20
21
"errors"
21
22
"fmt"
22
23
"os"
23
24
"path/filepath"
24
25
"strings"
25
26
"time"
26
27
27
- "context"
28
-
29
28
"k8s.io/klog"
30
29
31
30
api "k8s.io/api/core/v1"
@@ -227,10 +226,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
227
226
228
227
if utilfeature .DefaultFeatureGate .Enabled (features .CSINodeInfo ) &&
229
228
utilfeature .DefaultFeatureGate .Enabled (features .CSIMigration ) {
230
- // This function prevents Kubelet from posting Ready status until CSINodeInfo
229
+ // This function prevents Kubelet from posting Ready status until CSINode
231
230
// is both installed and initialized
232
231
if err := initializeCSINode (host ); err != nil {
233
- return errors .New (log ("failed to initialize CSINodeInfo : %v" , err ))
232
+ return errors .New (log ("failed to initialize CSINode : %v" , err ))
234
233
}
235
234
}
236
235
@@ -240,21 +239,28 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
240
239
func initializeCSINode (host volume.VolumeHost ) error {
241
240
kvh , ok := host .(volume.KubeletVolumeHost )
242
241
if ! ok {
243
- klog .V (4 ).Info ("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet" )
242
+ klog .V (4 ).Info ("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet" )
244
243
return nil
245
244
}
246
245
kubeClient := host .GetKubeClient ()
247
246
if kubeClient == nil {
248
- // Kubelet running in standalone mode. Skip CSINodeInfo initialization
249
- klog .Warning ("Skipping CSINodeInfo initialization, kubelet running in standalone mode" )
247
+ // Kubelet running in standalone mode. Skip CSINode initialization
248
+ klog .Warning ("Skipping CSINode initialization, kubelet running in standalone mode" )
250
249
return nil
251
250
}
252
251
253
- kvh .SetKubeletError (errors .New ("CSINodeInfo is not yet initialized" ))
252
+ kvh .SetKubeletError (errors .New ("CSINode is not yet initialized" ))
254
253
255
254
go func () {
256
255
defer utilruntime .HandleCrash ()
257
256
257
+ // First wait indefinitely to talk to Kube APIServer
258
+ nodeName := host .GetNodeName ()
259
+ err := waitForAPIServerForever (kubeClient , nodeName )
260
+ if err != nil {
261
+ klog .Fatalf ("Failed to initialize CSINode while waiting for API server to report ok: %v" , err )
262
+ }
263
+
258
264
// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
259
265
// after max retry steps.
260
266
initBackoff := wait.Backoff {
@@ -263,12 +269,12 @@ func initializeCSINode(host volume.VolumeHost) error {
263
269
Factor : 6.0 ,
264
270
Jitter : 0.1 ,
265
271
}
266
- err : = wait .ExponentialBackoff (initBackoff , func () (bool , error ) {
267
- klog .V (4 ).Infof ("Initializing migrated drivers on CSINodeInfo " )
272
+ err = wait .ExponentialBackoff (initBackoff , func () (bool , error ) {
273
+ klog .V (4 ).Infof ("Initializing migrated drivers on CSINode " )
268
274
err := nim .InitializeCSINodeWithAnnotation ()
269
275
if err != nil {
270
- kvh .SetKubeletError (fmt .Errorf ("Failed to initialize CSINodeInfo : %v" , err ))
271
- klog .Errorf ("Failed to initialize CSINodeInfo : %v" , err )
276
+ kvh .SetKubeletError (fmt .Errorf ("Failed to initialize CSINode : %v" , err ))
277
+ klog .Errorf ("Failed to initialize CSINode : %v" , err )
272
278
return false , nil
273
279
}
274
280
@@ -282,7 +288,7 @@ func initializeCSINode(host volume.VolumeHost) error {
282
288
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
283
289
// code can be dropped from Kubelet.
284
290
// Kill the Kubelet process and allow it to restart to retry initialization
285
- klog .Fatalf ("Failed to initialize CSINodeInfo after retrying" )
291
+ klog .Fatalf ("Failed to initialize CSINode after retrying: %v" , err )
286
292
}
287
293
}()
288
294
return nil
@@ -914,3 +920,28 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
914
920
}
915
921
return highestSupportedVersion , nil
916
922
}
923
+
924
+ // waitForAPIServerForever waits forever to get a CSINode instance as a proxy
925
+ // for a healthy APIServer
926
+ func waitForAPIServerForever (client clientset.Interface , nodeName types.NodeName ) error {
927
+ var lastErr error
928
+ err := wait .PollImmediateInfinite (time .Second , func () (bool , error ) {
929
+ // Get a CSINode from API server to make sure 1) kubelet can reach API server
930
+ // and 2) it has enough permissions. Kubelet may have restricted permissions
931
+ // when it's bootstrapping TLS.
932
+ // https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
933
+ _ , lastErr = client .StorageV1 ().CSINodes ().Get (context .TODO (), string (nodeName ), meta.GetOptions {})
934
+ if lastErr == nil || apierrors .IsNotFound (lastErr ) {
935
+ // API server contacted
936
+ return true , nil
937
+ }
938
+ klog .V (2 ).Infof ("Failed to contact API server when waiting for CSINode publishing: %s" , lastErr )
939
+ return false , nil
940
+ })
941
+ if err != nil {
942
+ // In theory this is unreachable, but just in case:
943
+ return fmt .Errorf ("%v: %v" , err , lastErr )
944
+ }
945
+
946
+ return nil
947
+ }
0 commit comments