Skip to content

Commit ab7091c

Browse files
committed
Support bmcpfs fileset dynamic provisioning
1 parent 042958f commit ab7091c

File tree

11 files changed

+971
-33
lines changed

11 files changed

+971
-33
lines changed

.golangci.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ issues:
2323
- path: .*\.go
2424
linters:
2525
- unused
26+
# Temporary: suppress deprecated Endpoints API warning until dadi migrates to EndpointSlice.
27+
- path: pkg/dadi/.*\.go
28+
text: corev1\.Endpoints is deprecated
29+
linters:
30+
- staticcheck
2631

2732
# Mode of the generated files analysis.
2833
#

_typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ teh = "teh"
1010
ded = "ded"
1111
eles = "eles"
1212
TAGED = "TAGED"
13+
Entrie = "Entrie"
14+
DescribeFilesetsResponseBodyEntriesEntrie = "DescribeFilesetsResponseBodyEntriesEntrie"

pkg/bmcpfs/bmcpfs.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,20 @@ import (
2424
)
2525

2626
const (
27-
driverType = "bmcpfs"
28-
driverName = "bmcpfsplugin.csi.alibabacloud.com"
27+
driverType = "bmcpfs"
28+
driverName = "bmcpfsplugin.csi.alibabacloud.com"
2929

3030
// keys in volume context or publish context
3131
_vpcMountTarget = "vpcMountTarget"
3232
_vscMountTarget = "vscMountTarget"
33-
_vscId = "vscId"
33+
_vscID = "vscId"
3434
_networkType = "networkType"
3535
_path = "path"
3636
_mpAutoSwitch = "mountpointAutoSwitch"
3737

38-
// prefix of node id
39-
CommonNodeIDPrefix = "common:"
38+
// CommonNodeIDPrefix is the prefix for common node IDs
39+
CommonNodeIDPrefix = "common:"
40+
// LingjunNodeIDPrefix is the prefix for lingjun node IDs
4041
LingjunNodeIDPrefix = "lingjun:"
4142

4243
// network types of CPFS mount targets

pkg/bmcpfs/controllerserver.go

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package bmcpfs
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
24+
"path/filepath"
2325
"strconv"
2426
"strings"
2527

@@ -43,6 +45,7 @@ type controllerServer struct {
4345
common.GenericControllerServer
4446
vscManager *internal.PrimaryVscManagerWithCache
4547
attachDetacher internal.CPFSAttachDetacher
48+
filsetManager internal.CPFSFileSetManager
4649
nasClient *nasclient.Client
4750
skipDetach bool
4851
}
@@ -62,18 +65,85 @@ func newControllerServer(region string) (*controllerServer, error) {
6265
if err != nil {
6366
return nil, err
6467
}
68+
6569
return &controllerServer{
6670
vscManager: internal.NewPrimaryVscManagerWithCache(efloClient),
6771
attachDetacher: internal.NewCPFSAttachDetacher(nasClient),
72+
filsetManager: internal.NewCPFSFileSetManager(nasClient),
6873
nasClient: nasClient,
6974
skipDetach: skipDetach,
7075
}, nil
7176
}
7277

78+
// CreateVolume ...
79+
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
80+
logger := klog.FromContext(ctx)
81+
logger.V(2).Info("starting")
82+
83+
// Validate parameters
84+
if err := validateFileSetParameters(req); err != nil {
85+
klog.Errorf("CreateVolume: error parameters from input: %v, with error: %v", req.Name, err)
86+
return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err)
87+
}
88+
89+
// Extract parameters
90+
params := req.GetParameters()
91+
bmcpfsID := params["bmcpfsId"]
92+
fullPath := req.Name
93+
if rootPath, ok := params["path"]; ok && rootPath != "" {
94+
fullPath = filepath.Join(rootPath, req.Name)
95+
}
96+
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
97+
98+
// Create fileset
99+
fileSetID, err := cs.filsetManager.CreateFileSet(ctx, bmcpfsID, req.Name, fullPath, 1000000, volSizeBytes, false)
100+
if err != nil {
101+
return nil, status.Error(codes.Internal, err.Error())
102+
}
103+
104+
// Prepare volume context
105+
volumeContext := req.GetParameters()
106+
if volumeContext == nil {
107+
volumeContext = make(map[string]string)
108+
}
109+
volumeContext = updateVolumeContext(volumeContext)
110+
111+
klog.Infof("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]", req.GetName(), fileSetID, bmcpfsID, fullPath)
112+
113+
tmpVol := createVolumeResponse(fileSetID, bmcpfsID, volSizeBytes, volumeContext)
114+
115+
return &csi.CreateVolumeResponse{Volume: tmpVol}, nil
116+
}
117+
118+
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
119+
logger := klog.FromContext(ctx)
120+
logger.V(2).Info("starting")
121+
122+
// Parse volume ID to extract filesystem ID and fileset ID
123+
fsID, fileSetID, err := parseVolumeID(req.VolumeId)
124+
if err != nil {
125+
klog.Errorf("DeleteVolume: failed to parse volume ID %s: %v", req.VolumeId, err)
126+
return nil, status.Error(codes.InvalidArgument, err.Error())
127+
}
128+
129+
klog.Infof("DeleteVolume: deleting fileset %s from filesystem %s", fileSetID, fsID)
130+
131+
// Delete the fileset
132+
err = cs.filsetManager.DeleteFileSet(ctx, fsID, fileSetID)
133+
if err != nil {
134+
klog.Errorf("DeleteVolume: failed to delete fileset %s from filesystem %s: %v", fileSetID, fsID, err)
135+
return nil, status.Error(codes.Internal, err.Error())
136+
}
137+
138+
klog.Infof("DeleteVolume: successfully deleted fileset %s from filesystem %s", fileSetID, fsID)
139+
return &csi.DeleteVolumeResponse{}, nil
140+
}
141+
73142
func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
74143
return &csi.ControllerGetCapabilitiesResponse{
75144
Capabilities: common.ControllerRPCCapabilities(
76145
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
146+
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
77147
),
78148
}, nil
79149
}
@@ -94,7 +164,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
94164
}
95165

96166
cpfsID, _ := parseVolumeHandle(req.VolumeId)
97-
// Get VscMountTarget of filesystem
167+
98168
mt := req.VolumeContext[_vscMountTarget]
99169
if mt == "" {
100170
var err error
@@ -105,18 +175,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
105175
}
106176

107177
// Get Primary vsc of Lingjun node
108-
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
178+
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
109179
if LingjunNodeIDPrefix == "" {
110180
return nil, status.Error(codes.InvalidArgument, "invalid node id")
111181
}
112-
vscId, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceId, false)
182+
vscID, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceID, false)
113183
if err != nil {
114184
return nil, status.Error(codes.Internal, err.Error())
115185
}
116-
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscId)
186+
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscID)
117187

118188
// Attach CPFS to VSC
119-
err = cs.attachDetacher.Attach(ctx, cpfsID, vscId)
189+
err = cs.attachDetacher.Attach(ctx, cpfsID, vscID)
120190
if err != nil {
121191
if autoSwitch, _ := strconv.ParseBool(req.VolumeContext[_mpAutoSwitch]); autoSwitch && internal.IsAttachNotSupportedError(err) {
122192
if req.VolumeContext[_vpcMountTarget] == "" {
@@ -135,35 +205,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
135205

136206
// TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node
137207

138-
klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscId, "node", req.NodeId)
208+
klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscID, "node", req.NodeId)
139209
return &csi.ControllerPublishVolumeResponse{
140210
PublishContext: map[string]string{
141211
_networkType: networkTypeVSC,
142-
_vscId: vscId,
212+
_vscID: vscID,
143213
_vscMountTarget: mt,
144214
},
145215
}, nil
146216
}
147217

148-
func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (
149-
*csi.ControllerUnpublishVolumeResponse, error,
150-
) {
218+
func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
151219
if !strings.HasPrefix(req.NodeId, LingjunNodeIDPrefix) || cs.skipDetach {
152220
return &csi.ControllerUnpublishVolumeResponse{}, nil
153221
}
154222
// Create Primary vsc for Lingjun node
155-
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
223+
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
156224
if LingjunNodeIDPrefix == "" {
157225
return nil, status.Error(codes.InvalidArgument, "invalid node id")
158226
}
159-
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceId)
227+
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceID)
160228
if err != nil {
161-
return nil, status.Error(codes.Internal, err.Error())
229+
return nil, status.Errorf(codes.Internal, "get vsc error: %v", err)
162230
}
163231
if vsc == nil {
164232
klog.InfoS("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found", "node", req.NodeId)
165233
return &csi.ControllerUnpublishVolumeResponse{}, nil
166234
}
235+
167236
// If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error.
168237
err = cs.attachDetacher.Detach(ctx, req.VolumeId, vsc.VscID)
169238
if err != nil {
@@ -173,6 +242,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
173242
return &csi.ControllerUnpublishVolumeResponse{}, nil
174243
}
175244

245+
// KubernetesAlicloudIdentity is the user agent string for Eflo client
176246
var KubernetesAlicloudIdentity = fmt.Sprintf("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s", version.VERSION)
177247

178248
const efloConnTimeout = 10
@@ -227,9 +297,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) {
227297
return parts[0], ""
228298
}
229299

230-
func getMountTarget(client *nasclient.Client, fsId, networkType string) (string, error) {
300+
func getMountTarget(client *nasclient.Client, fsID, networkType string) (string, error) {
231301
resp, err := client.DescribeFileSystems(&nasclient.DescribeFileSystemsRequest{
232-
FileSystemId: &fsId,
302+
FileSystemId: &fsID,
233303
})
234304
if err != nil {
235305
return "", fmt.Errorf("nas:DescribeFileSystems failed: %w", err)
@@ -252,7 +322,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
252322
if t == networkType {
253323
mountTarget := tea.StringValue(mt.MountTargetDomain)
254324
status := tea.StringValue(mt.Status)
255-
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsId, "networkType", networkType, "mountTarget", mountTarget, "status", status)
325+
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsID, "networkType", networkType, "mountTarget", mountTarget, "status", status)
256326
if status == "Active" {
257327
return mountTarget, nil
258328
}

0 commit comments

Comments
 (0)