Skip to content

Commit b58fb32

Browse files
authored
Merge pull request #51 from munnerz/resume-pending-requests
add pending request cache to allow for resuming in-flight requests that take longer than a single issuance cycle
2 parents 716d4cf + 0ce8db0 commit b58fb32

File tree

4 files changed

+404
-28
lines changed

4 files changed

+404
-28
lines changed

manager/manager.go

Lines changed: 207 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package manager
1818

1919
import (
2020
"context"
21+
"crypto"
2122
"crypto/x509"
2223
"encoding/pem"
2324
"errors"
@@ -40,6 +41,7 @@ import (
4041
"k8s.io/apimachinery/pkg/types"
4142
"k8s.io/apimachinery/pkg/util/uuid"
4243
"k8s.io/apimachinery/pkg/util/wait"
44+
"k8s.io/client-go/tools/cache"
4345
"k8s.io/utils/clock"
4446

4547
internalapi "github.com/cert-manager/csi-lib/internal/api"
@@ -157,14 +159,73 @@ func NewManager(opts Options) (*Manager, error) {
157159
return nil, fmt.Errorf("building node name label selector: %w", err)
158160
}
159161

162+
// construct the requestToPrivateKeyMap so we can use event handlers below to manage it
163+
var requestToPrivateKeyLock sync.Mutex
164+
requestToPrivateKeyMap := make(map[types.UID]crypto.PrivateKey)
160165
// Create an informer factory
161166
informerFactory := cminformers.NewSharedInformerFactoryWithOptions(opts.Client, 0, cminformers.WithTweakListOptions(func(opts *metav1.ListOptions) {
162167
opts.LabelSelector = labels.NewSelector().Add(*nodeNameReq).String()
163168
}))
164169
// Fetch the lister before calling Start() to ensure this informer is
165170
// registered with the factory
166171
lister := informerFactory.Certmanager().V1().CertificateRequests().Lister()
172+
informerFactory.Certmanager().V1().CertificateRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
173+
DeleteFunc: func(obj interface{}) {
174+
requestToPrivateKeyLock.Lock()
175+
defer requestToPrivateKeyLock.Unlock()
176+
key, ok := obj.(string)
177+
if !ok {
178+
return
179+
}
180+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
181+
if err != nil {
182+
return
183+
}
184+
req, err := lister.CertificateRequests(namespace).Get(name)
185+
if err != nil {
186+
// we no longer have a copy of this request, so we can't work out its UID.
187+
// instead the associated pending request entry for this request will be cleaned up by the periodic 'janitor' task.
188+
return
189+
}
190+
191+
if _, ok := requestToPrivateKeyMap[req.UID]; ok {
192+
delete(requestToPrivateKeyMap, req.UID)
193+
}
194+
},
195+
})
196+
197+
// create a stop channel that manages all sub goroutines
167198
stopCh := make(chan struct{})
199+
// begin a background routine which periodically checks to ensure all members of the pending request map actually
200+
// have corresponding CertificateRequest objects in the apiserver.
201+
// This avoids leaking memory if we don't observe a request being deleted, or we observe it after the lister has purged
202+
// the request data from its cache.
203+
// this routine must be careful to not delete entries from this map that have JUST been added to the map, but haven't
204+
// been observed by the lister yet (else it may purge data we want to keep, causing a whole new request cycle).
205+
// for now, to avoid this case, we only run the routine every 5 minutes. It would be better if we recorded the time we
206+
// added the entry to the map instead, and only purged items from the map that are older that N duration (TBD).
207+
janitorLogger := opts.Log.WithName("pending_request_janitor")
208+
go wait.Until(func() {
209+
requestToPrivateKeyLock.Lock()
210+
defer requestToPrivateKeyLock.Unlock()
211+
reqs, err := lister.List(labels.Everything())
212+
if err != nil {
213+
janitorLogger.Error(err, "failed listing existing requests")
214+
return
215+
}
216+
217+
existsMap := make(map[types.UID]struct{})
218+
for _, req := range reqs {
219+
existsMap[req.UID] = struct{}{}
220+
}
221+
222+
for uid := range requestToPrivateKeyMap {
223+
if _, ok := existsMap[uid]; !ok {
224+
// purge the item from the map as it does not exist in the apiserver
225+
delete(requestToPrivateKeyMap, uid)
226+
}
227+
}
228+
}, time.Minute*5, stopCh)
168229
// Begin watching the API
169230
informerFactory.Start(stopCh)
170231
informerFactory.WaitForCacheSync(stopCh)
@@ -173,9 +234,13 @@ func NewManager(opts Options) (*Manager, error) {
173234
client: opts.Client,
174235
clientForMetadata: opts.ClientForMetadata,
175236
lister: lister,
176-
metadataReader: opts.MetadataReader,
177-
clock: opts.Clock,
178-
log: *opts.Log,
237+
// we pass a pointer to the mutex as the janitor routine above also uses this lock,
238+
// so we want to avoid making a copy of it
239+
requestToPrivateKeyLock: &requestToPrivateKeyLock,
240+
requestToPrivateKeyMap: requestToPrivateKeyMap,
241+
metadataReader: opts.MetadataReader,
242+
clock: opts.Clock,
243+
log: *opts.Log,
179244

180245
generatePrivateKey: opts.GeneratePrivateKey,
181246
generateRequest: opts.GenerateRequest,
@@ -260,6 +325,10 @@ type Manager struct {
260325
// lister is used as a read-only cache of CertificateRequest resources
261326
lister cmlisters.CertificateRequestLister
262327

328+
// A map that associates a CertificateRequest's UID with its private key.
329+
requestToPrivateKeyLock *sync.Mutex
330+
requestToPrivateKeyMap map[types.UID]crypto.PrivateKey
331+
263332
// used to read metadata from the store
264333
metadataReader storage.MetadataReader
265334

@@ -298,10 +367,23 @@ type Manager struct {
298367
// requestNameGenerator generates a new random name for a certificaterequest object
299368
// Defaults to uuid.NewUUID() from k8s.io/apimachinery/pkg/util/uuid.
300369
requestNameGenerator func() string
370+
371+
// doNotUse_CallOnEachIssue is a field used SOLELY for testing, and cannot be configured by external package consumers.
372+
// It is used to perform some action (e.g. counting) each time issue() is called.
373+
// It will be removed as soon as we have actual metrics support in csi-lib, which will allow us to measure
374+
// things like the number of times issue() is called.
375+
// No thread safety is added around this field, and it MUST NOT be used for any implementation logic.
376+
// It should not be used full-stop :).
377+
doNotUse_CallOnEachIssue func()
301378
}
302379

303380
// issue will step through the entire issuance flow for a volume.
304381
func (m *Manager) issue(ctx context.Context, volumeID string) error {
382+
// TODO: remove this code and replace with actual metrics support
383+
if m.doNotUse_CallOnEachIssue != nil {
384+
m.doNotUse_CallOnEachIssue()
385+
}
386+
305387
log := m.log.WithValues("volume_id", volumeID)
306388
log.Info("Processing issuance")
307389

@@ -315,10 +397,32 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
315397
}
316398
log.V(2).Info("Read metadata", "metadata", meta)
317399

400+
// check if there is already a pending request in-flight for this volume.
401+
// if there is, and we still have a copy of its private key in memory, we can resume the existing request and
402+
// avoid creating additional CertificateRequest objects.
403+
existingReq, err := m.findPendingRequest(meta)
404+
if err != nil {
405+
return fmt.Errorf("failed when checking if an existing request exists: %w", err)
406+
}
407+
// if there is an existing in-flight request, attempt to 'resume' it (i.e. re-check to see if it is ready)
408+
if existingReq != nil {
409+
// we can only resume a request if we still have a reference to its private key, so look that up in our pending
410+
// requests map
411+
if key, ok := m.readPendingRequestPrivateKey(existingReq.UID); ok {
412+
log.V(4).Info("Re-using existing certificaterequest")
413+
return m.handleRequest(ctx, volumeID, meta, key, existingReq)
414+
}
415+
416+
// if we don't have a copy of the associated private key, delete the currently in-flight request
417+
log.V(2).Info("Found existing request that we don't have corresponding private key for - restarting request process")
418+
if err := m.client.CertmanagerV1().CertificateRequests(existingReq.Namespace).Delete(ctx, existingReq.Name, metav1.DeleteOptions{}); err != nil {
419+
return fmt.Errorf("failed to delete existing in-flight request: %w", err)
420+
}
421+
}
422+
318423
if ready, reason := m.readyToRequest(meta); !ready {
319424
return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason)
320425
}
321-
322426
key, err := m.generatePrivateKey(meta)
323427
if err != nil {
324428
return fmt.Errorf("generating private key: %w", err)
@@ -343,11 +447,78 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
343447
}
344448
log.Info("Created new CertificateRequest resource")
345449

346-
// Poll every 1s for the CertificateRequest to be ready
450+
// persist the reference to the private key in memory so we can resume this request in future if it doesn't complete
451+
// the first time.
452+
m.writePendingRequestPrivateKey(req.UID, key)
453+
return m.handleRequest(ctx, volumeID, meta, key, req)
454+
}
455+
456+
func (m *Manager) readPendingRequestPrivateKey(uid types.UID) (crypto.PrivateKey, bool) {
457+
m.requestToPrivateKeyLock.Lock()
458+
defer m.requestToPrivateKeyLock.Unlock()
459+
key, ok := m.requestToPrivateKeyMap[uid]
460+
return key, ok
461+
}
462+
463+
func (m *Manager) writePendingRequestPrivateKey(uid types.UID, key crypto.PrivateKey) {
464+
m.requestToPrivateKeyLock.Lock()
465+
defer m.requestToPrivateKeyLock.Unlock()
466+
m.requestToPrivateKeyMap[uid] = key
467+
}
468+
469+
func (m *Manager) deletePendingRequestPrivateKey(uid types.UID) {
470+
m.requestToPrivateKeyLock.Lock()
471+
defer m.requestToPrivateKeyLock.Unlock()
472+
delete(m.requestToPrivateKeyMap, uid)
473+
}
474+
475+
func (m *Manager) findPendingRequest(meta metadata.Metadata) (*cmapi.CertificateRequest, error) {
476+
reqs, err := m.listAllRequestsForVolume(meta.VolumeID)
477+
if err != nil {
478+
return nil, err
479+
}
480+
481+
if len(reqs) == 0 {
482+
return nil, nil
483+
}
484+
485+
// only consider the newest request - we will never resume an older request
486+
req := reqs[0]
487+
if !certificateRequestCanBeResumed(req) {
488+
return nil, nil
489+
}
490+
491+
// TODO: check if this request is still actually valid for the input metadata
492+
return req, nil
493+
}
494+
495+
func certificateRequestCanBeResumed(req *cmapi.CertificateRequest) bool {
496+
for _, cond := range req.Status.Conditions {
497+
if cond.Type == cmapi.CertificateRequestConditionReady {
498+
switch cond.Reason {
499+
case cmapi.CertificateRequestReasonPending, cmapi.CertificateRequestReasonIssued, "":
500+
// either explicit Pending, Issued or empty is considered re-sumable
501+
return true
502+
default:
503+
// any other state is a terminal failed state and means the request has failed
504+
return false
505+
}
506+
}
507+
}
508+
// if there is no Ready condition, the request is still pending processing
509+
return true
510+
}
511+
512+
func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metadata.Metadata, key crypto.PrivateKey, req *cmapi.CertificateRequest) error {
513+
log := m.log.WithValues("volume_id", volumeID)
514+
515+
// Poll every 200ms for the CertificateRequest to be ready
347516
lastFailureReason := ""
348-
if err := wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) {
517+
if err := wait.PollUntilWithContext(ctx, time.Millisecond*200, func(ctx context.Context) (done bool, err error) {
518+
log.V(4).Info("Reading CertificateRequest from lister cache")
349519
updatedReq, err := m.lister.CertificateRequests(req.Namespace).Get(req.Name)
350520
if apierrors.IsNotFound(err) {
521+
log.V(4).Info("Failed to read CertificateRequest from lister cache", "error", err)
351522
// A NotFound error implies something deleted the resource - fail
352523
// early to allow a retry to occur at a later time if needed.
353524
return false, err
@@ -371,6 +542,7 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
371542

372543
isApproved := apiutil.CertificateRequestIsApproved(updatedReq)
373544
if !isApproved {
545+
log.V(4).Info("CertificateRequest is not explicitly approved - continuing to check if the request has been issued anyway")
374546
lastFailureReason = fmt.Sprintf("request %q has not yet been approved by approval plugin", updatedReq.Name)
375547
// we don't stop execution here, as some versions of cert-manager (and some external issuer plugins)
376548
// may not be aware/utilise approval.
@@ -380,6 +552,7 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
380552

381553
readyCondition := apiutil.GetCertificateRequestCondition(updatedReq, cmapi.CertificateRequestConditionReady)
382554
if readyCondition == nil {
555+
log.V(4).Info("Ready condition not found - will recheck...")
383556
// only overwrite the approval failure message if the request is actually approved
384557
// otherwise we may hide more useful information from the user by accident.
385558
if isApproved {
@@ -390,10 +563,12 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
390563

391564
switch readyCondition.Reason {
392565
case cmapi.CertificateRequestReasonIssued:
566+
log.V(4).Info("CertificateRequest has been issued!")
393567
break
394568
case cmapi.CertificateRequestReasonFailed:
395569
return false, fmt.Errorf("request %q has failed: %s", updatedReq.Name, readyCondition.Message)
396570
case cmapi.CertificateRequestReasonPending:
571+
log.V(4).Info("CertificateRequest is still pending...")
397572
if isApproved {
398573
lastFailureReason = fmt.Sprintf("request %q is pending: %v", updatedReq.Name, readyCondition.Message)
399574
}
@@ -425,37 +600,55 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
425600
return fmt.Errorf("calculating next issuance time: %w", err)
426601
}
427602
meta.NextIssuanceTime = &renewalPoint
603+
log.V(4).Info("Persisting next issuance time to metadata store", "next_issuance_time", renewalPoint)
428604

429605
if err := m.writeKeypair(meta, key, req.Status.Certificate, req.Status.CA); err != nil {
430606
return fmt.Errorf("writing keypair: %w", err)
431607
}
432608
log.V(2).Info("Wrote new keypair to storage")
433609

610+
// We must explicitly delete the private key from the pending requests map so that the existing Completed
611+
// request will not be re-used upon renewal.
612+
// Without this, the renewal would pick up the existing issued certificate and re-issue, rather than requesting
613+
// a new certificate.
614+
m.deletePendingRequestPrivateKey(req.UID)
615+
log.V(4).Info("Removed pending request private key from internal cache")
616+
434617
return nil
435618
}
436619

437-
func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
620+
// returns a list of all pending certificaterequest objects for the given volumeID.
621+
// the returned slice will be ordered with the most recent request FIRST.
622+
func (m *Manager) listAllRequestsForVolume(volumeID string) ([]*cmapi.CertificateRequest, error) {
438623
sel, err := m.labelSelectorForVolume(volumeID)
439624
if err != nil {
440-
return fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
625+
return nil, fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
441626
}
442627

443628
reqs, err := m.lister.List(sel)
444629
if err != nil {
445-
return fmt.Errorf("listing certificaterequests: %w", err)
446-
}
447-
448-
if len(reqs) < m.maxRequestsPerVolume {
449-
return nil
630+
return nil, fmt.Errorf("listing certificaterequests: %w", err)
450631
}
451632

452633
// sort newest first to oldest last
453634
sort.Slice(reqs, func(i, j int) bool {
454635
return reqs[i].CreationTimestamp.After(reqs[j].CreationTimestamp.Time)
455636
})
456637

638+
return reqs, nil
639+
}
640+
641+
func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
642+
reqs, err := m.listAllRequestsForVolume(volumeID)
643+
if err != nil {
644+
return err
645+
}
646+
if len(reqs) <= m.maxRequestsPerVolume {
647+
return nil
648+
}
649+
457650
// start at the end of the slice and work back to maxRequestsPerVolume
458-
for i := len(reqs) - 1; i >= m.maxRequestsPerVolume-1; i-- {
651+
for i := len(reqs) - 1; i > m.maxRequestsPerVolume-1; i-- {
459652
toDelete := reqs[i]
460653
if err := m.client.CertmanagerV1().CertificateRequests(toDelete.Namespace).Delete(ctx, toDelete.Name, metav1.DeleteOptions{}); err != nil {
461654
if apierrors.IsNotFound(err) {

0 commit comments

Comments
 (0)