Skip to content

Commit 82b787e

Browse files
committed
Add option to skip blocking pod startup if driver is not ready to create a request yet
Signed-off-by: James Munnelly <[email protected]>
1 parent 34ffa38 commit 82b787e

File tree

7 files changed

+269
-16
lines changed

7 files changed

+269
-16
lines changed

driver/driver.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ type Options struct {
5252
// If not specified, the current operating system's default implementation
5353
// will be used (i.e. 'mount.New("")')
5454
Mounter mount.Interface
55+
// ContinueOnNotReady will cause the driver's nodeserver to continue
56+
// mounting the volume even if the driver is not ready to create a request yet.
57+
// This is useful if you need to defer requesting a certificate until after
58+
// initialization of the Pod (e.g. IPAM so a pod IP is allocated).
59+
// Enabling this option WILL cause a period of time during pod startup whereby
60+
// certificate data is not available in the volume whilst the process is running.
61+
// An `initContainer` or other special logic in the user application must be
62+
// added to avoid running into CrashLoopBackOff situations which can delay pod
63+
// start time.
64+
ContinueOnNotReady bool
5565
}
5666

5767
func New(endpoint string, log logr.Logger, opts Options) (*Driver, error) {
@@ -75,11 +85,12 @@ func buildServers(opts Options, log logr.Logger) (*identityServer, *controllerSe
7585
opts.Mounter = mount.New("")
7686
}
7787
return NewIdentityServer(opts.DriverName, opts.DriverVersion), &controllerServer{}, &nodeServer{
78-
log: log,
79-
nodeID: opts.NodeID,
80-
manager: opts.Manager,
81-
store: opts.Store,
82-
mounter: opts.Mounter,
88+
log: log,
89+
nodeID: opts.NodeID,
90+
manager: opts.Manager,
91+
store: opts.Store,
92+
mounter: opts.Mounter,
93+
continueOnNotReady: opts.ContinueOnNotReady,
8394
}
8495
}
8596

driver/nodeserver.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type nodeServer struct {
4141
mounter mount.Interface
4242

4343
log logr.Logger
44+
45+
continueOnNotReady bool
4446
}
4547

4648
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
@@ -83,10 +85,17 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
8385

8486
log.Info("Volume registered for management")
8587

86-
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
87-
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
88-
}, ctx.Done()); err != nil {
89-
return nil, err
88+
// Only wait for the volume to be ready if it is in a state of 'ready to request'
89+
// already. This allows implementors to defer actually requesting certificates
90+
// until later in the pod lifecycle (e.g. after CNI has run & an IP address has been
91+
// allocated, if a user wants to embed pod IPs into their requests).
92+
isReadyToRequest := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
93+
if isReadyToRequest || !ns.continueOnNotReady {
94+
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
95+
return ns.manager.IsVolumeReady(req.GetVolumeId()), nil
96+
}, ctx.Done()); err != nil {
97+
return nil, err
98+
}
9099
}
91100

92101
log.Info("Volume ready for mounting")

manager/interfaces.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,14 @@ type WriteKeypairFunc func(meta metadata.Metadata, key crypto.PrivateKey, chain
8989
// volume being published. Useful for modifying clients to make use of CSI
9090
// token requests.
9191
type ClientForMetadataFunc func(meta metadata.Metadata) (cmclient.Interface, error)
92+
93+
// ReadyToRequestFunc can be optionally implemented by drivers to indicate whether
94+
// the driver is ready to request a certificate for the given volume/metadata.
95+
// This can be used to 'defer' fetching until later pod initialization events have
96+
// happened (e.g. CNI has allocated an IP if you want to embed a pod IP into the certificate
97+
// request resources).
98+
type ReadyToRequestFunc func(meta metadata.Metadata) bool
99+
100+
func AlwaysReadyToRequest(_ metadata.Metadata) bool {
101+
return true
102+
}

manager/manager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type Options struct {
8181
GenerateRequest GenerateRequestFunc
8282
SignRequest SignRequestFunc
8383
WriteKeypair WriteKeypairFunc
84+
ReadyToRequest ReadyToRequestFunc
8485
}
8586

8687
// NewManager constructs a new manager used to manage volumes containing
@@ -117,6 +118,9 @@ func NewManager(opts Options) (*Manager, error) {
117118
if opts.WriteKeypair == nil {
118119
return nil, errors.New("WriteKeypair must be set")
119120
}
121+
if opts.ReadyToRequest == nil {
122+
opts.ReadyToRequest = AlwaysReadyToRequest
123+
}
120124
if opts.MaxRequestsPerVolume == 0 {
121125
opts.MaxRequestsPerVolume = 1
122126
}
@@ -156,6 +160,7 @@ func NewManager(opts Options) (*Manager, error) {
156160
generateRequest: opts.GenerateRequest,
157161
signRequest: opts.SignRequest,
158162
writeKeypair: opts.WriteKeypair,
163+
readyToRequest: opts.ReadyToRequest,
159164

160165
managedVolumes: map[string]chan struct{}{},
161166
stopInformer: stopCh,
@@ -218,6 +223,7 @@ type Manager struct {
218223
generateRequest GenerateRequestFunc
219224
signRequest SignRequestFunc
220225
writeKeypair WriteKeypairFunc
226+
readyToRequest ReadyToRequestFunc
221227

222228
lock sync.Mutex
223229
// global view of all volumes managed by this manager
@@ -252,6 +258,10 @@ func (m *Manager) issue(volumeID string) error {
252258
}
253259
log.V(2).Info("Read metadata", "metadata", meta)
254260

261+
if !m.readyToRequest(meta) {
262+
return fmt.Errorf("driver is not ready to request a certificate for this volume")
263+
}
264+
255265
key, err := m.generatePrivateKey(meta)
256266
if err != nil {
257267
return fmt.Errorf("generating private key: %w", err)
@@ -526,6 +536,16 @@ func (m *Manager) UnmanageVolume(volumeID string) {
526536
}
527537
}
528538

539+
func (m *Manager) IsVolumeReadyToRequest(volumeID string) bool {
540+
meta, err := m.metadataReader.ReadMetadata(volumeID)
541+
if err != nil {
542+
m.log.Error(err, "failed to read metadata", "volume_id", volumeID)
543+
return false
544+
}
545+
546+
return m.readyToRequest(meta)
547+
}
548+
529549
func (m *Manager) IsVolumeReady(volumeID string) bool {
530550
meta, err := m.metadataReader.ReadMetadata(volumeID)
531551
if err != nil {

storage/filesystem.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (f *Filesystem) ListVolumes() ([]string, error) {
124124
return vols, nil
125125
}
126126

127-
// MetadataForVolume will return the metadata for the volume with the given ID.
127+
// ReadMetadata will return the metadata for the volume with the given ID.
128128
// Errors wrapping ErrNotFound will be returned if metadata for the ID cannot
129129
// be found.
130130
func (f *Filesystem) ReadMetadata(volumeID string) (metadata.Metadata, error) {
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
Copyright 2022 The cert-manager Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"context"
21+
"crypto"
22+
"crypto/x509"
23+
"errors"
24+
"fmt"
25+
"os"
26+
"reflect"
27+
"testing"
28+
"time"
29+
30+
"github.com/container-storage-interface/spec/lib/go/csi"
31+
"google.golang.org/grpc/codes"
32+
"google.golang.org/grpc/status"
33+
"k8s.io/apimachinery/pkg/util/wait"
34+
fakeclock "k8s.io/utils/clock/testing"
35+
36+
"github.com/cert-manager/csi-lib/manager"
37+
"github.com/cert-manager/csi-lib/metadata"
38+
"github.com/cert-manager/csi-lib/storage"
39+
testutil "github.com/cert-manager/csi-lib/test/util"
40+
)
41+
42+
func TestCompletesIfNotReadyToRequest(t *testing.T) {
43+
store := storage.NewMemoryFS()
44+
clock := fakeclock.NewFakeClock(time.Now())
45+
46+
calls := 0
47+
opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
48+
Store: store,
49+
Clock: clock,
50+
ContinueOnNotReady: true,
51+
ReadyToRequest: func(meta metadata.Metadata) bool {
52+
if calls < 1 {
53+
calls++
54+
return false
55+
}
56+
// only indicate we are ready after issuance has been attempted 1 time
57+
return calls == 1
58+
},
59+
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
60+
return nil, nil
61+
},
62+
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
63+
return &manager.CertificateRequestBundle{
64+
Namespace: "certificaterequest-namespace",
65+
}, nil
66+
},
67+
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
68+
return []byte{}, nil
69+
},
70+
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
71+
store.WriteFiles(meta, map[string][]byte{
72+
"ca": ca,
73+
"cert": chain,
74+
})
75+
nextIssuanceTime := clock.Now().Add(time.Hour)
76+
meta.NextIssuanceTime = &nextIssuanceTime
77+
return store.WriteMetadata(meta.VolumeID, meta)
78+
},
79+
})
80+
defer stop()
81+
82+
// Setup a routine to issue/sign the request IF it is created
83+
stopCh := make(chan struct{})
84+
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
85+
defer close(stopCh)
86+
87+
tmpDir, err := os.MkdirTemp("", "*")
88+
if err != nil {
89+
t.Fatal(err)
90+
}
91+
defer os.RemoveAll(tmpDir)
92+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
93+
defer cancel()
94+
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
95+
VolumeId: "test-vol",
96+
VolumeContext: map[string]string{
97+
"csi.storage.k8s.io/ephemeral": "true",
98+
"csi.storage.k8s.io/pod.name": "the-pod-name",
99+
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
100+
},
101+
TargetPath: tmpDir,
102+
Readonly: true,
103+
})
104+
if err != nil {
105+
t.Fatal(err)
106+
}
107+
108+
if err := wait.PollUntil(time.Second, func() (done bool, err error) {
109+
files, err := store.ReadFiles("test-vol")
110+
if errors.Is(err, storage.ErrNotFound) || len(files) <= 1 {
111+
return false, nil
112+
}
113+
if err != nil {
114+
return false, err
115+
}
116+
if !reflect.DeepEqual(files["ca"], []byte("ca bytes")) {
117+
return false, fmt.Errorf("unexpected CA data: %v", files["ca"])
118+
}
119+
if !reflect.DeepEqual(files["cert"], []byte("certificate bytes")) {
120+
return false, fmt.Errorf("unexpected certificate data: %v", files["cert"])
121+
}
122+
return true, nil
123+
}, ctx.Done()); err != nil {
124+
t.Error(err)
125+
}
126+
}
127+
128+
func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) {
129+
store := storage.NewMemoryFS()
130+
clock := fakeclock.NewFakeClock(time.Now())
131+
132+
opts, cl, stop := testutil.RunTestDriver(t, testutil.DriverOptions{
133+
Store: store,
134+
Clock: clock,
135+
ContinueOnNotReady: false,
136+
ReadyToRequest: func(meta metadata.Metadata) bool {
137+
return false
138+
},
139+
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
140+
return nil, nil
141+
},
142+
GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) {
143+
return &manager.CertificateRequestBundle{
144+
Namespace: "certificaterequest-namespace",
145+
}, nil
146+
},
147+
SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) {
148+
return []byte{}, nil
149+
},
150+
WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error {
151+
store.WriteFiles(meta, map[string][]byte{
152+
"ca": ca,
153+
"cert": chain,
154+
})
155+
nextIssuanceTime := clock.Now().Add(time.Hour)
156+
meta.NextIssuanceTime = &nextIssuanceTime
157+
return store.WriteMetadata(meta.VolumeID, meta)
158+
},
159+
})
160+
defer stop()
161+
162+
// Setup a routine to issue/sign the request IF it is created
163+
stopCh := make(chan struct{})
164+
go testutil.IssueAllRequests(t, opts.Client, "certificaterequest-namespace", stopCh, []byte("certificate bytes"), []byte("ca bytes"))
165+
defer close(stopCh)
166+
167+
tmpDir, err := os.MkdirTemp("", "*")
168+
if err != nil {
169+
t.Fatal(err)
170+
}
171+
defer os.RemoveAll(tmpDir)
172+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
173+
defer cancel()
174+
_, err = cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
175+
VolumeId: "test-vol",
176+
VolumeContext: map[string]string{
177+
"csi.storage.k8s.io/ephemeral": "true",
178+
"csi.storage.k8s.io/pod.name": "the-pod-name",
179+
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
180+
},
181+
TargetPath: tmpDir,
182+
Readonly: true,
183+
})
184+
if status.Code(err) != codes.DeadlineExceeded {
185+
t.Errorf("Expected timeout to be returned from NodePublishVolume but got: %v", err)
186+
}
187+
188+
files, err := store.ReadFiles("test-vol")
189+
if err != nil {
190+
t.Errorf("failed to read files: %v", err)
191+
}
192+
if len(files["ca"]) > 0 {
193+
t.Errorf("unexpected CA data: %v", files["ca"])
194+
}
195+
if len(files["cert"]) > 0 {
196+
t.Errorf("unexpected certificate data: %v", files["cert"])
197+
}
198+
}

test/util/testutil.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ type DriverOptions struct {
5353

5454
NodeID string
5555
MaxRequestsPerVolume int
56+
ContinueOnNotReady bool
5657

5758
GeneratePrivateKey manager.GeneratePrivateKeyFunc
5859
GenerateRequest manager.GenerateRequestFunc
5960
SignRequest manager.SignRequestFunc
6061
WriteKeypair manager.WriteKeypairFunc
62+
ReadyToRequest manager.ReadyToRequestFunc
6163
}
6264

6365
func RunTestDriver(t *testing.T, opts DriverOptions) (DriverOptions, csi.NodeClient, func()) {
@@ -117,15 +119,17 @@ func RunTestDriver(t *testing.T, opts DriverOptions) (DriverOptions, csi.NodeCli
117119
GenerateRequest: opts.GenerateRequest,
118120
SignRequest: opts.SignRequest,
119121
WriteKeypair: opts.WriteKeypair,
122+
ReadyToRequest: opts.ReadyToRequest,
120123
})
121124

122125
d := driver.NewWithListener(lis, *opts.Log, driver.Options{
123-
DriverName: "driver-name",
124-
DriverVersion: "v0.0.1",
125-
NodeID: opts.NodeID,
126-
Store: opts.Store,
127-
Mounter: opts.Mounter,
128-
Manager: m,
126+
DriverName: "driver-name",
127+
DriverVersion: "v0.0.1",
128+
NodeID: opts.NodeID,
129+
Store: opts.Store,
130+
Mounter: opts.Mounter,
131+
Manager: m,
132+
ContinueOnNotReady: opts.ContinueOnNotReady,
129133
})
130134

131135
// start the driver

0 commit comments

Comments
 (0)