Skip to content

Commit 5c04206

Browse files
authored
Merge pull request #141 from verult/final-top
Topology support
2 parents 99a83cd + 3ba9be3 commit 5c04206

File tree

6 files changed

+1751
-344
lines changed

6 files changed

+1751
-344
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ flycheck_*.el
4141

4242
.idea/
4343
/.project
44+
*.iml
4445

4546
# ignore build directory
4647
_output/

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
3030
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
3131
"github.com/kubernetes-incubator/external-storage/lib/controller"
32+
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
3233

3334
"google.golang.org/grpc"
3435

@@ -92,6 +93,10 @@ func init() {
9293
if err != nil {
9394
glog.Fatalf("Failed to create snapshot client: %v", err)
9495
}
96+
csiAPIClient, err := csiclientset.NewForConfig(config)
97+
if err != nil {
98+
glog.Fatalf("Failed to create CSI API client: %v", err)
99+
}
95100

96101
// The controller needs to know what the server version is because out-of-tree
97102
// provisioners aren't officially supported until 1.5
@@ -118,7 +123,7 @@ func init() {
118123
}
119124
// Create the provisioner: it implements the Provisioner interface expected by
120125
// the controller
121-
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
126+
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
122127
provisionController = controller.NewProvisionController(
123128
clientset,
124129
*provisioner,

pkg/controller/controller.go

Lines changed: 92 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"google.golang.org/grpc/status"
5050

5151
"github.com/container-storage-interface/spec/lib/go/csi/v0"
52+
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
5253
)
5354

5455
const (
@@ -81,6 +82,7 @@ const (
8182
type csiProvisioner struct {
8283
client kubernetes.Interface
8384
csiClient csi.ControllerClient
85+
csiAPIClient csiclientset.Interface
8486
grpcClient *grpc.ClientConn
8587
snapshotClient snapclientset.Interface
8688
timeout time.Duration
@@ -90,6 +92,18 @@ type csiProvisioner struct {
9092
config *rest.Config
9193
}
9294

95+
type driverState struct {
96+
driverName string
97+
capabilities sets.Int
98+
}
99+
100+
const (
101+
PluginCapability_CONTROLLER_SERVICE = iota
102+
PluginCapability_ACCESSIBILITY_CONSTRAINTS
103+
ControllerCapability_CREATE_DELETE_VOLUME
104+
ControllerCapability_CREATE_DELETE_SNAPSHOT
105+
)
106+
93107
var _ controller.Provisioner = &csiProvisioner{}
94108

95109
var (
@@ -163,61 +177,66 @@ func getDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error)
163177
return name, nil
164178
}
165179

166-
func supportsPluginControllerService(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
167-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
168-
defer cancel()
169-
170-
client := csi.NewIdentityClient(conn)
171-
req := csi.GetPluginCapabilitiesRequest{}
180+
func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) {
181+
pluginCaps, err := getPluginCapabilities(conn, timeout)
182+
if err != nil {
183+
return nil, err
184+
}
172185

173-
rsp, err := client.GetPluginCapabilities(ctx, &req)
186+
controllerCaps, err := getControllerCapabilities(conn, timeout)
174187
if err != nil {
175-
return false, err
188+
return nil, err
176189
}
177-
caps := rsp.GetCapabilities()
178-
for _, cap := range caps {
190+
191+
capabilities := make(sets.Int)
192+
for _, cap := range pluginCaps {
179193
if cap == nil {
180194
continue
181195
}
182196
service := cap.GetService()
183197
if service == nil {
184198
continue
185199
}
186-
if service.GetType() == csi.PluginCapability_Service_CONTROLLER_SERVICE {
187-
return true, nil
200+
switch service.GetType() {
201+
case csi.PluginCapability_Service_CONTROLLER_SERVICE:
202+
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE)
203+
case csi.PluginCapability_Service_ACCESSIBILITY_CONSTRAINTS:
204+
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS)
188205
}
189206
}
190-
return false, nil
191-
}
192-
193-
func supportsControllerCreateVolume(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
194-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
195-
defer cancel()
196-
197-
client := csi.NewControllerClient(conn)
198-
req := csi.ControllerGetCapabilitiesRequest{}
199-
200-
rsp, err := client.ControllerGetCapabilities(ctx, &req)
201-
if err != nil {
202-
return false, err
203-
}
204-
caps := rsp.GetCapabilities()
205-
for _, cap := range caps {
207+
for _, cap := range controllerCaps {
206208
if cap == nil {
207209
continue
208210
}
209211
rpc := cap.GetRpc()
210212
if rpc == nil {
211213
continue
212214
}
213-
if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME {
214-
return true, nil
215+
switch rpc.GetType() {
216+
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
217+
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
218+
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
219+
capabilities.Insert(ControllerCapability_CREATE_DELETE_SNAPSHOT)
215220
}
216221
}
217-
return false, nil
222+
return capabilities, nil
223+
}
224+
225+
func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.PluginCapability, error) {
226+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
227+
defer cancel()
228+
229+
client := csi.NewIdentityClient(conn)
230+
req := csi.GetPluginCapabilitiesRequest{}
231+
232+
rsp, err := client.GetPluginCapabilities(ctx, &req)
233+
if err != nil {
234+
return nil, err
235+
}
236+
return rsp.GetCapabilities(), nil
218237
}
219238

220-
func supportsControllerCreateSnapshot(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
239+
func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.ControllerServiceCapability, error) {
221240
ctx, cancel := context.WithTimeout(context.Background(), timeout)
222241
defer cancel()
223242

@@ -226,26 +245,14 @@ func supportsControllerCreateSnapshot(conn *grpc.ClientConn, timeout time.Durati
226245

227246
rsp, err := client.ControllerGetCapabilities(ctx, &req)
228247
if err != nil {
229-
return false, err
230-
}
231-
caps := rsp.GetCapabilities()
232-
for _, cap := range caps {
233-
if cap == nil {
234-
continue
235-
}
236-
rpc := cap.GetRpc()
237-
if rpc == nil {
238-
continue
239-
}
240-
if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT {
241-
return true, nil
242-
}
248+
return nil, err
243249
}
244-
return false, nil
250+
return rsp.GetCapabilities(), nil
245251
}
246252

247253
// NewCSIProvisioner creates new CSI provisioner
248254
func NewCSIProvisioner(client kubernetes.Interface,
255+
csiAPIClient csiclientset.Interface,
249256
csiEndpoint string,
250257
connectionTimeout time.Duration,
251258
identity string,
@@ -259,6 +266,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
259266
client: client,
260267
grpcClient: grpcClient,
261268
csiClient: csiClient,
269+
csiAPIClient: csiAPIClient,
262270
snapshotClient: snapshotClient,
263271
timeout: connectionTimeout,
264272
identity: identity,
@@ -268,27 +276,21 @@ func NewCSIProvisioner(client kubernetes.Interface,
268276
return provisioner
269277
}
270278

271-
// This function get called before any attepmt to communicate with the driver.
279+
// This function get called before any attempt to communicate with the driver.
272280
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
273281
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
274-
func checkDriverState(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (string, error) {
275-
ok, err := supportsPluginControllerService(grpcClient, timeout)
282+
func checkDriverState(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (*driverState, error) {
283+
capabilities, err := getDriverCapabilities(grpcClient, timeout)
276284
if err != nil {
277-
glog.Errorf("failed to get support info :%v", err)
278-
return "", err
279-
}
280-
if !ok {
281-
glog.Errorf("no plugin controller service support detected")
282-
return "", fmt.Errorf("no plugin controller service support detected")
285+
return nil, fmt.Errorf("failed to get capabilities: %v", err)
283286
}
284-
ok, err = supportsControllerCreateVolume(grpcClient, timeout)
285-
if err != nil {
286-
glog.Errorf("failed to get support info :%v", err)
287-
return "", err
287+
288+
if !capabilities.Has(PluginCapability_CONTROLLER_SERVICE) {
289+
return nil, fmt.Errorf("no plugin controller service support detected")
288290
}
289-
if !ok {
290-
glog.Error("no create/delete volume support detected")
291-
return "", fmt.Errorf("no create/delete volume support detected")
291+
292+
if !capabilities.Has(ControllerCapability_CREATE_DELETE_VOLUME) {
293+
return nil, fmt.Errorf("no create/delete volume support detected")
292294
}
293295

294296
// If PVC.Spec.DataSource is not nil, it indicates the request is to create volume
@@ -297,23 +299,19 @@ func checkDriverState(grpcClient *grpc.ClientConn, timeout time.Duration, needSn
297299
if needSnapshotSupport {
298300
// Check whether plugin supports create snapshot
299301
// If not, create volume from snapshot cannot proceed
300-
ok, err = supportsControllerCreateSnapshot(grpcClient, timeout)
301-
if err != nil {
302-
glog.Errorf("failed to get support info :%v", err)
303-
return "", err
304-
}
305-
if !ok {
306-
glog.Error("no create/delete snapshot support detected. Cannot create volume from shapshot")
307-
return "", fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from shapshot")
302+
if !capabilities.Has(ControllerCapability_CREATE_DELETE_SNAPSHOT) {
303+
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
308304
}
309305
}
310306

311307
driverName, err := getDriverName(grpcClient, timeout)
312308
if err != nil {
313-
glog.Errorf("failed to get driver info :%v", err)
314-
return "", err
309+
return nil, fmt.Errorf("failed to get driver info: %v", err)
315310
}
316-
return driverName, nil
311+
return &driverState{
312+
driverName: driverName,
313+
capabilities: capabilities,
314+
}, nil
317315
}
318316

319317
func makeVolumeName(prefix, pvcUID string, volumeNameUUIDLength int) (string, error) {
@@ -354,7 +352,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
354352
}
355353
needSnapshotSupport = true
356354
}
357-
driverName, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
355+
driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
358356
if err != nil {
359357
return nil, err
360358
}
@@ -414,6 +412,19 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
414412
req.VolumeContentSource = volumeContentSource
415413
}
416414

415+
if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
416+
requirements, err := GenerateAccessibilityRequirements(
417+
p.client,
418+
p.csiAPIClient,
419+
driverState.driverName,
420+
options.AllowedTopologies,
421+
options.SelectedNode)
422+
if err != nil {
423+
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
424+
}
425+
req.AccessibilityRequirements = requirements
426+
}
427+
417428
glog.V(5).Infof("CreateVolumeRequest %+v", req)
418429

419430
rep := &csi.CreateVolumeResponse{}
@@ -502,6 +513,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
502513
if len(fsType) == 0 {
503514
fsType = defaultFSType
504515
}
516+
505517
pv := &v1.PersistentVolume{
506518
ObjectMeta: metav1.ObjectMeta{
507519
Name: pvName,
@@ -515,7 +527,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
515527
// TODO wait for CSI VolumeSource API
516528
PersistentVolumeSource: v1.PersistentVolumeSource{
517529
CSI: &v1.CSIPersistentVolumeSource{
518-
Driver: driverName,
530+
Driver: driverState.driverName,
519531
VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),
520532
FSType: fsType,
521533
VolumeAttributes: volumeAttributes,
@@ -527,6 +539,10 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
527539
},
528540
}
529541

542+
if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
543+
pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
544+
}
545+
530546
glog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)
531547

532548
return pv, nil

0 commit comments

Comments
 (0)