Skip to content

Commit a58c86c

Browse files
committed
add pending request cache to allow for resuming in-flight requests that take longer than a single issuance cycle
Signed-off-by: James Munnelly <[email protected]>
1 parent 716d4cf commit a58c86c

File tree

1 file changed

+184
-14
lines changed

1 file changed

+184
-14
lines changed

manager/manager.go

Lines changed: 184 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package manager
1818

1919
import (
2020
"context"
21+
"crypto"
2122
"crypto/x509"
2223
"encoding/pem"
2324
"errors"
@@ -40,6 +41,7 @@ import (
4041
"k8s.io/apimachinery/pkg/types"
4142
"k8s.io/apimachinery/pkg/util/uuid"
4243
"k8s.io/apimachinery/pkg/util/wait"
44+
"k8s.io/client-go/tools/cache"
4345
"k8s.io/utils/clock"
4446

4547
internalapi "github.com/cert-manager/csi-lib/internal/api"
@@ -157,25 +159,85 @@ func NewManager(opts Options) (*Manager, error) {
157159
return nil, fmt.Errorf("building node name label selector: %w", err)
158160
}
159161

162+
// construct the requestToPrivateKeyMap so we can use event handlers below to manage it
163+
var requestToPrivateKeyLock sync.Mutex
164+
requestToPrivateKeyMap := make(map[types.UID]crypto.PrivateKey)
160165
// Create an informer factory
161166
informerFactory := cminformers.NewSharedInformerFactoryWithOptions(opts.Client, 0, cminformers.WithTweakListOptions(func(opts *metav1.ListOptions) {
162167
opts.LabelSelector = labels.NewSelector().Add(*nodeNameReq).String()
163168
}))
164169
// Fetch the lister before calling Start() to ensure this informer is
165170
// registered with the factory
166171
lister := informerFactory.Certmanager().V1().CertificateRequests().Lister()
172+
informerFactory.Certmanager().V1().CertificateRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
173+
DeleteFunc: func(obj interface{}) {
174+
key, ok := obj.(string)
175+
if !ok {
176+
return
177+
}
178+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
179+
if err != nil {
180+
return
181+
}
182+
req, err := lister.CertificateRequests(namespace).Get(name)
183+
if err != nil {
184+
// we no longer have a copy of this request, so we can't work out its UID.
185+
// instead the associated pending request entry for this request will be cleaned up by the periodic 'janitor' task.
186+
return
187+
}
188+
189+
requestToPrivateKeyLock.Lock()
190+
defer requestToPrivateKeyLock.Unlock()
191+
if _, ok := requestToPrivateKeyMap[req.UID]; ok {
192+
delete(requestToPrivateKeyMap, req.UID)
193+
}
194+
},
195+
})
196+
197+
// create a stop channel that manages all sub goroutines
167198
stopCh := make(chan struct{})
199+
// begin a background routine which periodically checks to ensure all members of the pending request map actually
200+
// have corresponding CertificateRequest objects in the apiserver.
201+
// This avoids leaking memory if we don't observe a request being deleted, or we observe it after the lister has purged
202+
// the request data from its cache.
203+
// this routine must be careful to not delete entries from this map that have JUST been added to the map, but haven't
204+
// been observed by the lister yet (else it may purge data we want to keep, causing a whole new request cycle).
205+
// for now, to avoid this case, we only run the routine every 5 minutes. It would be better if we recorded the time we
206+
// added the entry to the map instead, and only purged items from the map that are older that N duration (TBD).
207+
janitorLogger := opts.Log.WithName("pending_request_janitor")
208+
go wait.Until(func() {
209+
reqs, err := lister.List(labels.Everything())
210+
if err != nil {
211+
janitorLogger.Error(err, "failed listing existing requests")
212+
}
213+
214+
existsMap := make(map[types.UID]struct{})
215+
for _, req := range reqs {
216+
existsMap[req.UID] = struct{}{}
217+
}
218+
219+
requestToPrivateKeyLock.Lock()
220+
defer requestToPrivateKeyLock.Unlock()
221+
for uid := range requestToPrivateKeyMap {
222+
if _, ok := existsMap[uid]; !ok {
223+
// purge the item from the map as it does not exist in the apiserver
224+
delete(requestToPrivateKeyMap, uid)
225+
}
226+
}
227+
}, time.Minute*5, stopCh)
168228
// Begin watching the API
169229
informerFactory.Start(stopCh)
170230
informerFactory.WaitForCacheSync(stopCh)
171231

172232
m := &Manager{
173-
client: opts.Client,
174-
clientForMetadata: opts.ClientForMetadata,
175-
lister: lister,
176-
metadataReader: opts.MetadataReader,
177-
clock: opts.Clock,
178-
log: *opts.Log,
233+
client: opts.Client,
234+
clientForMetadata: opts.ClientForMetadata,
235+
lister: lister,
236+
requestToPrivateKeyLock: &requestToPrivateKeyLock,
237+
requestToPrivateKeyMap: requestToPrivateKeyMap,
238+
metadataReader: opts.MetadataReader,
239+
clock: opts.Clock,
240+
log: *opts.Log,
179241

180242
generatePrivateKey: opts.GeneratePrivateKey,
181243
generateRequest: opts.GenerateRequest,
@@ -260,6 +322,10 @@ type Manager struct {
260322
// lister is used as a read-only cache of CertificateRequest resources
261323
lister cmlisters.CertificateRequestLister
262324

325+
// A map that associates a CertificateRequest's UID with its private key.
326+
requestToPrivateKeyLock *sync.Mutex
327+
requestToPrivateKeyMap map[types.UID]crypto.PrivateKey
328+
263329
// used to read metadata from the store
264330
metadataReader storage.MetadataReader
265331

@@ -315,10 +381,32 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
315381
}
316382
log.V(2).Info("Read metadata", "metadata", meta)
317383

384+
// check if there is already a pending request in-flight for this volume.
385+
// if there is, and we still have a copy of its private key in memory, we can resume the existing request and
386+
// avoid creating additional CertificateRequest objects.
387+
existingReq, err := m.findPendingRequest(meta)
388+
if err != nil {
389+
return fmt.Errorf("failed when checking if an existing request exists: %w", err)
390+
}
391+
// if there is an existing in-flight request, attempt to 'resume' it (i.e. re-check to see if it is ready)
392+
if existingReq != nil {
393+
// we can only resume a request if we still have a reference to its private key, so look that up in our pending
394+
// requests map
395+
if key, ok := m.readPendingRequestPrivateKey(existingReq.UID); ok {
396+
log.V(4).Info("Re-using existing certificaterequest")
397+
return m.handleRequest(ctx, volumeID, meta, key, existingReq)
398+
}
399+
400+
// if we don't have a copy of the associated private key, delete the currently in-flight request
401+
log.V(2).Info("Found existing request that we don't have corresponding private key for - restarting request process")
402+
if err := m.client.CertmanagerV1().CertificateRequests(existingReq.Namespace).Delete(ctx, existingReq.Name, metav1.DeleteOptions{}); err != nil {
403+
return fmt.Errorf("failed to delete existing in-flight request: %w", err)
404+
}
405+
}
406+
318407
if ready, reason := m.readyToRequest(meta); !ready {
319408
return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason)
320409
}
321-
322410
key, err := m.generatePrivateKey(meta)
323411
if err != nil {
324412
return fmt.Errorf("generating private key: %w", err)
@@ -343,6 +431,71 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
343431
}
344432
log.Info("Created new CertificateRequest resource")
345433

434+
// persist the reference to the private key in memory so we can resume this request in future if it doesn't complete
435+
// the first time.
436+
m.writePendingRequestPrivateKey(req.UID, key)
437+
return m.handleRequest(ctx, volumeID, meta, key, req)
438+
}
439+
440+
func (m *Manager) readPendingRequestPrivateKey(uid types.UID) (crypto.PrivateKey, bool) {
441+
m.requestToPrivateKeyLock.Lock()
442+
defer m.requestToPrivateKeyLock.Unlock()
443+
key, ok := m.requestToPrivateKeyMap[uid]
444+
return key, ok
445+
}
446+
447+
func (m *Manager) writePendingRequestPrivateKey(uid types.UID, key crypto.PrivateKey) {
448+
m.requestToPrivateKeyLock.Lock()
449+
defer m.requestToPrivateKeyLock.Unlock()
450+
m.requestToPrivateKeyMap[uid] = key
451+
}
452+
453+
func (m *Manager) deletePendingRequestPrivateKey(uid types.UID) {
454+
m.requestToPrivateKeyLock.Lock()
455+
defer m.requestToPrivateKeyLock.Unlock()
456+
delete(m.requestToPrivateKeyMap, uid)
457+
}
458+
459+
func (m *Manager) findPendingRequest(meta metadata.Metadata) (*cmapi.CertificateRequest, error) {
460+
reqs, err := m.listAllRequestsForVolume(meta.VolumeID)
461+
if err != nil {
462+
return nil, err
463+
}
464+
465+
if len(reqs) == 0 {
466+
return nil, nil
467+
}
468+
469+
// only consider the newest request - we will never resume an older request
470+
req := reqs[0]
471+
if !certificateRequestCanBeResumed(req) {
472+
return nil, nil
473+
}
474+
475+
// TODO: check if this request is still actually valid for the input metadata
476+
return req, nil
477+
}
478+
479+
func certificateRequestCanBeResumed(req *cmapi.CertificateRequest) bool {
480+
for _, cond := range req.Status.Conditions {
481+
if cond.Type == cmapi.CertificateRequestConditionReady {
482+
switch cond.Reason {
483+
case cmapi.CertificateRequestReasonPending, cmapi.CertificateRequestReasonIssued, "":
484+
// either explicit Pending, Issued or empty is considered re-sumable
485+
return true
486+
default:
487+
// any other state is a terminal failed state and means the request has failed
488+
return false
489+
}
490+
}
491+
}
492+
// if there is no Ready condition, the request is still pending processing
493+
return true
494+
}
495+
496+
func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metadata.Metadata, key crypto.PrivateKey, req *cmapi.CertificateRequest) error {
497+
log := m.log.WithValues("volume_id", volumeID)
498+
346499
// Poll every 1s for the CertificateRequest to be ready
347500
lastFailureReason := ""
348501
if err := wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) {
@@ -431,29 +584,46 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
431584
}
432585
log.V(2).Info("Wrote new keypair to storage")
433586

587+
// We must explicitly delete the private key from the pending requests map so that the existing Completed
588+
// request will not be re-used upon renewal.
589+
// Without this, the renewal would pick up the existing issued certificate and re-issue, rather than requesting
590+
// a new certificate.
591+
m.deletePendingRequestPrivateKey(req.UID)
592+
434593
return nil
435594
}
436595

437-
func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
596+
// returns a list of all pending certificaterequest objects for the given volumeID.
597+
// the returned slice will be ordered with the most recent request FIRST.
598+
func (m *Manager) listAllRequestsForVolume(volumeID string) ([]*cmapi.CertificateRequest, error) {
438599
sel, err := m.labelSelectorForVolume(volumeID)
439600
if err != nil {
440-
return fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
601+
return nil, fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
441602
}
442603

443604
reqs, err := m.lister.List(sel)
444605
if err != nil {
445-
return fmt.Errorf("listing certificaterequests: %w", err)
446-
}
447-
448-
if len(reqs) < m.maxRequestsPerVolume {
449-
return nil
606+
return nil, fmt.Errorf("listing certificaterequests: %w", err)
450607
}
451608

452609
// sort newest first to oldest last
453610
sort.Slice(reqs, func(i, j int) bool {
454611
return reqs[i].CreationTimestamp.After(reqs[j].CreationTimestamp.Time)
455612
})
456613

614+
return reqs, nil
615+
}
616+
617+
func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
618+
reqs, err := m.listAllRequestsForVolume(volumeID)
619+
if err != nil {
620+
return err
621+
}
622+
623+
if len(reqs) < m.maxRequestsPerVolume {
624+
return nil
625+
}
626+
457627
// start at the end of the slice and work back to maxRequestsPerVolume
458628
for i := len(reqs) - 1; i >= m.maxRequestsPerVolume-1; i-- {
459629
toDelete := reqs[i]

0 commit comments

Comments
 (0)