Skip to content

Commit 4d748d6

Browse files
committed
refactor nodeserver to only attempt issuance once in a single NodePublishVolume call
Signed-off-by: James Munnelly <[email protected]>
1 parent 9baa172 commit 4d748d6

File tree

3 files changed

+111
-39
lines changed

3 files changed

+111
-39
lines changed

driver/nodeserver.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/go-logr/logr"
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
29-
"k8s.io/apimachinery/pkg/util/wait"
3029
"k8s.io/mount-utils"
3130

3231
"github.com/cert-manager/csi-lib/manager"
@@ -79,32 +78,31 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
7978
}
8079
}
8180

82-
if err := ns.manager.ManageVolume(req.GetVolumeId()); err != nil {
83-
return nil, err
84-
}
85-
86-
log.Info("Volume registered for management")
87-
88-
// Only wait for the volume to be ready if it is in a state of 'ready to request'
89-
// already. This allows implementors to defer actually requesting certificates
90-
// until later in the pod lifecycle (e.g. after CNI has run & an IP address has been
91-
// allocated, if a user wants to embed pod IPs into their requests).
92-
isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
93-
if !isReadyToRequest {
94-
log.Info("Unable to request a certificate right now, will be retried", "reason", reason)
95-
}
96-
if isReadyToRequest || !ns.continueOnNotReady {
97-
log.Info("Waiting for certificate to be issued...")
98-
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
99-
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
100-
}, ctx.Done()); err != nil {
101-
return nil, err
81+
if !ns.manager.IsVolumeReady(req.GetVolumeId()) {
82+
// Only wait for the volume to be ready if it is in a state of 'ready to request'
83+
// already. This allows implementors to defer actually requesting certificates
84+
// until later in the pod lifecycle (e.g. after CNI has run & an IP address has been
85+
// allocated, if a user wants to embed pod IPs into their requests).
86+
isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
87+
if isReadyToRequest {
88+
log.V(4).Info("Waiting for certificate to be issued...")
89+
if _, err := ns.manager.ManageVolumeImmediate(ctx, req.GetVolumeId()); err != nil {
90+
return nil, err
91+
}
92+
log.Info("Volume registered for management")
93+
} else {
94+
if ns.continueOnNotReady {
95+
log.V(4).Info("Skipping waiting for certificate to be issued")
96+
ns.manager.ManageVolume(req.GetVolumeId())
97+
log.V(4).Info("Volume registered for management")
98+
} else {
99+
log.Info("Unable to request a certificate right now, will be retried", "reason", reason)
100+
return nil, fmt.Errorf("volume is not yet ready to be setup, will be retried: %s", reason)
101+
}
102102
}
103-
} else {
104-
log.Info("Skipping waiting for certificate to be issued")
105103
}
106104

107-
log.Info("Volume ready for mounting")
105+
log.Info("Ensuring data directory for volume is mounted into pod...")
108106
notMnt, err := mount.IsNotMountPoint(ns.mounter, req.GetTargetPath())
109107
switch {
110108
case os.IsNotExist(err):
@@ -118,11 +116,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
118116

119117
if !notMnt {
120118
// Nothing more to do if the targetPath is already a bind mount
119+
log.Info("Volume already mounted to pod, nothing to do")
121120
success = true
122121
return &csi.NodePublishVolumeResponse{}, nil
123122
}
124123

125-
log.Info("Bind mounting data directory to the targetPath")
124+
log.Info("Bind mounting data directory to the pod's mount namespace")
126125
// bind mount the targetPath to the data directory
127126
if err := ns.mounter.Mount(ns.store.PathForVolume(req.GetVolumeId()), req.GetTargetPath(), "", []string{"bind", "ro"}); err != nil {
128127
return nil, err

manager/manager.go

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -454,22 +454,67 @@ func (m *Manager) submitRequest(ctx context.Context, meta metadata.Metadata, csr
454454
return req, nil
455455
}
456456

457-
// ManageVolume will initiate management of data for the given volumeID.
458-
func (m *Manager) ManageVolume(volumeID string) error {
457+
// ManageVolumeImmediate will register a volume for management and immediately attempt a single issuance.
458+
// This
459+
func (m *Manager) ManageVolumeImmediate(ctx context.Context, volumeID string) (managed bool, err error) {
460+
if !m.manageVolumeIfNotManaged(volumeID) {
461+
return false, nil
462+
}
463+
464+
meta, err := m.metadataReader.ReadMetadata(volumeID)
465+
if err != nil {
466+
return true, fmt.Errorf("reading metadata: %w", err)
467+
}
468+
469+
// Only attempt issuance immediately if there isn't already an issued certificate
470+
if meta.NextIssuanceTime == nil {
471+
// If issuance fails, immediately return without retrying so the caller can decide
472+
// how to proceed depending on the context this method was called within.
473+
if err := m.issue(ctx, volumeID); err != nil {
474+
return true, err
475+
}
476+
}
477+
478+
if !m.startRenewalRoutine(volumeID) {
479+
return true, fmt.Errorf("unexpected state: renewal routine not started, please open an issue at https://github.com/cert-manager/csi-lib")
480+
}
481+
482+
return true, nil
483+
}
484+
485+
// manageVolumeIfNotManaged will ensure the named volume has been registered for management.
486+
// It returns 'true' if the volume was not previously managed, and false if the volume was already managed.
487+
func (m *Manager) manageVolumeIfNotManaged(volumeID string) (managed bool) {
459488
m.lock.Lock()
460489
defer m.lock.Unlock()
461490
log := m.log.WithValues("volume_id", volumeID)
462491

463492
// if the volume is already managed, return early
464-
if _, ok := m.managedVolumes[volumeID]; ok {
493+
if _, managed := m.managedVolumes[volumeID]; managed {
465494
log.V(2).Info("Volume already registered for management")
466-
return nil
495+
return false
467496
}
468497

469498
// construct a new channel used to stop management of the volume
470499
stopCh := make(chan struct{})
471500
m.managedVolumes[volumeID] = stopCh
472501

502+
return true
503+
}
504+
505+
// startRenewalRoutine will begin the background issuance goroutine for the given volumeID.
506+
// It is the caller's responsibility to ensure this is only called once per volume.
507+
func (m *Manager) startRenewalRoutine(volumeID string) (started bool) {
508+
m.lock.Lock()
509+
defer m.lock.Unlock()
510+
log := m.log.WithValues("volume_id", volumeID)
511+
512+
stopCh, ok := m.managedVolumes[volumeID]
513+
if !ok {
514+
log.Info("Volume not registered for management, cannot start renewal routine...")
515+
return false
516+
}
517+
473518
// Create a context that will be cancelled when the stopCh is closed
474519
ctx, cancel := context.WithCancel(context.Background())
475520
go func() {
@@ -481,7 +526,6 @@ func (m *Manager) ManageVolume(volumeID string) error {
481526

482527
go func() {
483528
// check every volume once per second
484-
// TODO: optimise this to not check so often
485529
ticker := time.NewTicker(time.Second)
486530
for {
487531
select {
@@ -496,9 +540,14 @@ func (m *Manager) ManageVolume(volumeID string) error {
496540
}
497541

498542
if meta.NextIssuanceTime == nil || m.clock.Now().After(*meta.NextIssuanceTime) {
499-
wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
500-
// 2s is the 'base' amount of time for the backoff
501-
Duration: time.Second * 2,
543+
// If issuing a certificate fails, we don't go around the outer for loop again (as we'd then be creating
544+
// a new CertificateRequest every second).
545+
// Instead, retry within the same iteration of the for loop and apply an exponential backoff.
546+
// Because we pass ctx through to the 'wait' package, if the stopCh is closed/context is cancelled,
547+
// we'll immediately stop waiting and 'continue' which will then hit the `case <-stopCh` case in the `select`.
548+
if err := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
549+
// 8s is the 'base' amount of time for the backoff
550+
Duration: time.Second * 8,
502551
// We multiple the 'duration' by 2.0 if the attempt fails/errors
503552
Factor: 2.0,
504553
// Add a jitter of +/- 1s (0.5 of the 'duration')
@@ -507,22 +556,39 @@ func (m *Manager) ManageVolume(volumeID string) error {
507556
// reset back to the 'base duration'. Set this to the MaxInt32, as we never want to
508557
// reset this unless we get a successful attempt.
509558
Steps: math.MaxInt32,
510-
// The maximum time between calls will be 1 minute
511-
Cap: time.Minute,
559+
// The maximum time between calls will be 5 minutes
560+
Cap: time.Minute * 5,
512561
}, func() (bool, error) {
513562
log.Info("Triggering new issuance")
514563
if err := m.issue(ctx, volumeID); err != nil {
515564
log.Error(err, "Failed to issue certificate, retrying after applying exponential backoff")
516565
return false, nil
517566
}
518567
return true, nil
519-
})
568+
}); err != nil {
569+
if errors.Is(err, wait.ErrWaitTimeout) || errors.Is(err, context.DeadlineExceeded) {
570+
continue
571+
}
572+
// this should never happen as the function above never actually returns errors
573+
log.Error(err, "unexpected error")
574+
}
520575
}
521576
}
522577
}
523578
}()
579+
return true
580+
}
524581

525-
return nil
582+
// ManageVolume will initiate management of data for the given volumeID.
583+
func (m *Manager) ManageVolume(volumeID string) (managed bool) {
584+
log := m.log.WithValues("volume_id", volumeID)
585+
if managed := m.manageVolumeIfNotManaged(volumeID); !managed {
586+
return false
587+
}
588+
if started := m.startRenewalRoutine(volumeID); !started {
589+
log.Info("unexpected state: renewal routine not started, please open an issue at https://github.com/cert-manager/csi-lib")
590+
}
591+
return true
526592
}
527593

528594
func (m *Manager) UnmanageVolume(volumeID string) {
@@ -546,6 +612,13 @@ func (m *Manager) IsVolumeReadyToRequest(volumeID string) (bool, string) {
546612
}
547613

548614
func (m *Manager) IsVolumeReady(volumeID string) bool {
615+
m.lock.Lock()
616+
defer m.lock.Unlock()
617+
// a volume is not classed as Ready if it is not managed
618+
if _, managed := m.managedVolumes[volumeID]; !managed {
619+
return false
620+
}
621+
549622
meta, err := m.metadataReader.ReadMetadata(volumeID)
550623
if err != nil {
551624
m.log.Error(err, "failed to read metadata", "volume_id", volumeID)

test/integration/ready_to_request_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) {
180180
TargetPath: tmpDir,
181181
Readonly: true,
182182
})
183-
if status.Code(err) != codes.DeadlineExceeded {
184-
t.Errorf("Expected timeout to be returned from NodePublishVolume but got: %v", err)
183+
if status.Code(err) != codes.Unknown || err.Error() != "rpc error: code = Unknown desc = volume is not yet ready to be setup, will be retried: never ready" {
184+
t.Errorf("unexpected error: %v", err)
185185
}
186186

187187
// allow 1s for the cleanup functions in NodePublishVolume to be run

0 commit comments

Comments
 (0)