Skip to content

Commit 69abbbc

Browse files
authored
Merge pull request #20 from munnerz/continue-on-error
Add option to skip blocking pod startup if driver is not ready to create a request yet
2 parents 34ffa38 + e152da4 commit 69abbbc

File tree

8 files changed

+284
-17
lines changed

8 files changed

+284
-17
lines changed

driver/driver.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ type Options struct {
5252
// If not specified, the current operating system's default implementation
5353
// will be used (i.e. 'mount.New("")')
5454
Mounter mount.Interface
55+
// ContinueOnNotReady will cause the driver's nodeserver to continue
56+
// mounting the volume even if the driver is not ready to create a request yet.
57+
// This is useful if you need to defer requesting a certificate until after
58+
// initialization of the Pod (e.g. IPAM so a pod IP is allocated).
59+
// Enabling this option WILL cause a period of time during pod startup whereby
60+
// certificate data is not available in the volume whilst the process is running.
61+
// An `initContainer` or other special logic in the user application must be
62+
// added to avoid running into CrashLoopBackOff situations which can delay pod
63+
// start time.
64+
ContinueOnNotReady bool
5565
}
5666

5767
func New(endpoint string, log logr.Logger, opts Options) (*Driver, error) {
@@ -75,11 +85,12 @@ func buildServers(opts Options, log logr.Logger) (*identityServer, *controllerSe
7585
opts.Mounter = mount.New("")
7686
}
7787
return NewIdentityServer(opts.DriverName, opts.DriverVersion), &controllerServer{}, &nodeServer{
78-
log: log,
79-
nodeID: opts.NodeID,
80-
manager: opts.Manager,
81-
store: opts.Store,
82-
mounter: opts.Mounter,
88+
log: log,
89+
nodeID: opts.NodeID,
90+
manager: opts.Manager,
91+
store: opts.Store,
92+
mounter: opts.Mounter,
93+
continueOnNotReady: opts.ContinueOnNotReady,
8394
}
8495
}
8596

driver/nodeserver.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type nodeServer struct {
4141
mounter mount.Interface
4242

4343
log logr.Logger
44+
45+
continueOnNotReady bool
4446
}
4547

4648
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
@@ -83,10 +85,23 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
8385

8486
log.Info("Volume registered for management")
8587

86-
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
87-
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
88-
}, ctx.Done()); err != nil {
89-
return nil, err
88+
// Only wait for the volume to be ready if it is in a state of 'ready to request'
89+
// already. This allows implementors to defer actually requesting certificates
90+
// until later in the pod lifecycle (e.g. after CNI has run & an IP address has been
91+
// allocated, if a user wants to embed pod IPs into their requests).
92+
isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
93+
if !isReadyToRequest {
94+
log.Info("Unable to request a certificate right now, will be retried", "reason", reason)
95+
}
96+
if isReadyToRequest || !ns.continueOnNotReady {
97+
log.Info("Waiting for certificate to be issued...")
98+
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
99+
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
100+
}, ctx.Done()); err != nil {
101+
return nil, err
102+
}
103+
} else {
104+
log.Info("Skipping waiting for certificate to be issued")
90105
}
91106

92107
log.Info("Volume ready for mounting")

manager/interfaces.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,14 @@ type WriteKeypairFunc func(meta metadata.Metadata, key crypto.PrivateKey, chain
8989
// volume being published. Useful for modifying clients to make use of CSI
9090
// token requests.
9191
type ClientForMetadataFunc func(meta metadata.Metadata) (cmclient.Interface, error)
92+
93+
// ReadyToRequestFunc can be optionally implemented by drivers to indicate whether
94+
// the driver is ready to request a certificate for the given volume/metadata.
95+
// This can be used to 'defer' fetching until later pod initialization events have
96+
// happened (e.g. CNI has allocated an IP if you want to embed a pod IP into the certificate
97+
// request resources).
98+
type ReadyToRequestFunc func(meta metadata.Metadata) (bool, string)
99+
100+
func AlwaysReadyToRequest(_ metadata.Metadata) (bool, string) {
101+
return true, ""
102+
}

manager/manager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type Options struct {
8181
GenerateRequest GenerateRequestFunc
8282
SignRequest SignRequestFunc
8383
WriteKeypair WriteKeypairFunc
84+
ReadyToRequest ReadyToRequestFunc
8485
}
8586

8687
// NewManager constructs a new manager used to manage volumes containing
@@ -117,6 +118,9 @@ func NewManager(opts Options) (*Manager, error) {
117118
if opts.WriteKeypair == nil {
118119
return nil, errors.New("WriteKeypair must be set")
119120
}
121+
if opts.ReadyToRequest == nil {
122+
opts.ReadyToRequest = AlwaysReadyToRequest
123+
}
120124
if opts.MaxRequestsPerVolume == 0 {
121125
opts.MaxRequestsPerVolume = 1
122126
}
@@ -156,6 +160,7 @@ func NewManager(opts Options) (*Manager, error) {
156160
generateRequest: opts.GenerateRequest,
157161
signRequest: opts.SignRequest,
158162
writeKeypair: opts.WriteKeypair,
163+
readyToRequest: opts.ReadyToRequest,
159164

160165
managedVolumes: map[string]chan struct{}{},
161166
stopInformer: stopCh,
@@ -218,6 +223,7 @@ type Manager struct {
218223
generateRequest GenerateRequestFunc
219224
signRequest SignRequestFunc
220225
writeKeypair WriteKeypairFunc
226+
readyToRequest ReadyToRequestFunc
221227

222228
lock sync.Mutex
223229
// global view of all volumes managed by this manager
@@ -252,6 +258,10 @@ func (m *Manager) issue(volumeID string) error {
252258
}
253259
log.V(2).Info("Read metadata", "metadata", meta)
254260

261+
if ready, reason := m.readyToRequest(meta); !ready {
262+
return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason)
263+
}
264+
255265
key, err := m.generatePrivateKey(meta)
256266
if err != nil {
257267
return fmt.Errorf("generating private key: %w", err)
@@ -526,6 +536,16 @@ func (m *Manager) UnmanageVolume(volumeID string) {
526536
}
527537
}
528538

539+
func (m *Manager) IsVolumeReadyToRequest(volumeID string) (bool, string) {
540+
meta, err := m.metadataReader.ReadMetadata(volumeID)
541+
if err != nil {
542+
m.log.Error(err, "failed to read metadata", "volume_id", volumeID)
543+
return false, ""
544+
}
545+
546+
return m.readyToRequest(meta)
547+
}
548+
529549
func (m *Manager) IsVolumeReady(volumeID string) bool {
530550
meta, err := m.metadataReader.ReadMetadata(volumeID)
531551
if err != nil {

storage/filesystem.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (f *Filesystem) ListVolumes() ([]string, error) {
124124
return vols, nil
125125
}
126126

127-
// MetadataForVolume will return the metadata for the volume with the given ID.
127+
// ReadMetadata will return the metadata for the volume with the given ID.
128128
// Errors wrapping ErrNotFound will be returned if metadata for the ID cannot
129129
// be found.
130130
func (f *Filesystem) ReadMetadata(volumeID string) (metadata.Metadata, error) {

storage/memory.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,5 +146,10 @@ func (m *MemoryFS) ReadFiles(volumeID string) (map[string][]byte, error) {
146146
if !ok {
147147
return nil, ErrNotFound
148148
}
149-
return vol, nil
149+
// make a copy of the map to ensure no races can occur
150+
cpy := make(map[string][]byte)
151+
for k, v := range vol {
152+
cpy[k] = v
153+
}
154+
return cpy, nil
150155
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
Copyright 2022 The cert-manager Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"context"
21+
"crypto"
22+
"crypto/x509"
23+
"errors"
24+
"fmt"
25+
"os"
26+
"reflect"
27+
"testing"
28+
"time"
29+
30+
"github.com/container-storage-interface/spec/lib/go/csi"
31+
"google.golang.org/grpc/codes"
32+
"google.golang.org/grpc/status"
33+
"k8s.io/apimachinery/pkg/util/wait"
34+
fakeclock "k8s.io/utils/clock/testing"
35+
36+
"github.com/cert-manager/csi-lib/manager"
37+
"github.com/cert-manager/csi-lib/metadata"
38+
"github.com/cert-manager/csi-lib/storage"
39+
testutil "github.com/cert-manager/csi-lib/test/util"
40+
)
41+
42+
func Test_CompletesIfNotReadyToRequest_ContinueOnNotReadyEnabled(t *testing.T) {
43+
store := storage.NewMemoryFS()
44+
clock := fakeclock.NewFakeClock(time.Now())
45+
46+
calls := 0
47+
opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
48+
Store: store,
49+
Clock: clock,
50+
ContinueOnNotReady: true,
51+
ReadyToRequest: func(meta metadata.Metadata) (bool, string) {
52+
if calls < 1 {
53+
calls++
54+
return false, "calls < 1"
55+
}
56+
// only indicate we are ready after issuance has been attempted 1 time
57+
return calls == 1, "calls == 1"
58+
},
59+
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
60+
return nil, nil
61+
},
62+
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
63+
return &manager.CertificateRequestBundle{
64+
Namespace: "certificaterequest-namespace",
65+
}, nil
66+
},
67+
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
68+
return []byte{}, nil
69+
},
70+
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
71+
store.WriteFiles(meta, map[string][]byte{
72+
"ca": ca,
73+
"cert": chain,
74+
})
75+
nextIssuanceTime := clock.Now().Add(time.Hour)
76+
meta.NextIssuanceTime = &nextIssuanceTime
77+
return store.WriteMetadata(meta.VolumeID, meta)
78+
},
79+
})
80+
defer stop()
81+
82+
// Setup a routine to issue/sign the request IF it is created
83+
stopCh := make(chan struct{})
84+
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
85+
defer close(stopCh)
86+
87+
tmpDir, err := os.MkdirTemp("", "*")
88+
if err != nil {
89+
t.Fatal(err)
90+
}
91+
defer os.RemoveAll(tmpDir)
92+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
93+
defer cancel()
94+
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
95+
VolumeId: "test-vol",
96+
VolumeContext: map[string]string{
97+
"csi.storage.k8s.io/ephemeral": "true",
98+
"csi.storage.k8s.io/pod.name": "the-pod-name",
99+
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
100+
},
101+
TargetPath: tmpDir,
102+
Readonly: true,
103+
})
104+
if err != nil {
105+
t.Fatal(err)
106+
}
107+
108+
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
109+
files, err := store.ReadFiles("test-vol")
110+
if errors.Is(err, storage.ErrNotFound) || len(files) <= 1 {
111+
return false, nil
112+
}
113+
if err != nil {
114+
return false, err
115+
}
116+
if !reflect.DeepEqual(files["ca"], []byte("ca bytes")) {
117+
return false, fmt.Errorf("unexpected CA data: %v", files["ca"])
118+
}
119+
if !reflect.DeepEqual(files["cert"], []byte("certificate bytes")) {
120+
return false, fmt.Errorf("unexpected certificate data: %v", files["cert"])
121+
}
122+
return true, nil
123+
}, ctx.Done()); err != nil {
124+
t.Error(err)
125+
}
126+
}
127+
128+
func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) {
129+
store := storage.NewMemoryFS()
130+
clock := fakeclock.NewFakeClock(time.Now())
131+
132+
opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
133+
Store: store,
134+
Clock: clock,
135+
ContinueOnNotReady: false,
136+
ReadyToRequest: func(meta metadata.Metadata) (bool, string) {
137+
return false, "never ready"
138+
},
139+
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
140+
return nil, nil
141+
},
142+
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
143+
return &manager.CertificateRequestBundle{
144+
Namespace: "certificaterequest-namespace",
145+
}, nil
146+
},
147+
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
148+
return []byte{}, nil
149+
},
150+
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
151+
store.WriteFiles(meta, map[string][]byte{
152+
"ca": ca,
153+
"cert": chain,
154+
})
155+
nextIssuanceTime := clock.Now().Add(time.Hour)
156+
meta.NextIssuanceTime = &nextIssuanceTime
157+
return store.WriteMetadata(meta.VolumeID, meta)
158+
},
159+
})
160+
defer stop()
161+
162+
// Setup a routine to issue/sign the request IF it is created
163+
stopCh := make(chan struct{})
164+
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
165+
defer close(stopCh)
166+
tmpDir, err := os.MkdirTemp("", "*")
167+
if err != nil {
168+
t.Fatal(err)
169+
}
170+
defer os.RemoveAll(tmpDir)
171+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
172+
defer cancel()
173+
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
174+
VolumeId: "test-vol",
175+
VolumeContext: map[string]string{
176+
"csi.storage.k8s.io/ephemeral": "true",
177+
"csi.storage.k8s.io/pod.name": "the-pod-name",
178+
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
179+
},
180+
TargetPath: tmpDir,
181+
Readonly: true,
182+
})
183+
if status.Code(err) != codes.DeadlineExceeded {
184+
t.Errorf("Expected timeout to be returned from NodePublishVolume but got: %v", err)
185+
}
186+
187+
// allow 1s for the cleanup functions in NodePublishVolume to be run
188+
// without this pause, the test can flake due to the storage backend not
189+
// being cleaned up of the persisted metadata file.
190+
ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
191+
defer cancel2()
192+
if err := wait.PollUntil(time.Millisecond*100, func() (bool, error) {
193+
_, err := store.ReadFiles("test-vol")
194+
if err != storage.ErrNotFound {
195+
return false, nil
196+
}
197+
return true, nil
198+
}, ctx.Done()); err != nil {
199+
t.Errorf("failed to wait for storage backend to return NotFound: %v", err)
200+
}
201+
}

0 commit comments

Comments
 (0)