Skip to content

Commit 5d2db6e

Browse files
committed
Support fileset of bmcpfs
1 parent 5be8fbb commit 5d2db6e

File tree

11 files changed

+1359
-94
lines changed

11 files changed

+1359
-94
lines changed

_typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ teh = "teh"
1010
ded = "ded"
1111
eles = "eles"
1212
TAGED = "TAGED"
13+
Entrie = "Entrie"
14+
DescribeFilesetsResponseBodyEntriesEntrie = "DescribeFilesetsResponseBodyEntriesEntrie"

pkg/bmcpfs/bmcpfs.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,28 @@ const (
3030
// keys in volume context or publish context
3131
_vpcMountTarget = "vpcMountTarget"
3232
_vscMountTarget = "vscMountTarget"
33-
_vscId = "vscId"
33+
_vscID = "vscId"
3434
_networkType = "networkType"
3535
_path = "path"
3636
_mpAutoSwitch = "mountpointAutoSwitch"
3737

38-
// prefix of node id
39-
CommonNodeIDPrefix = "common:"
38+
// CommonNodeIDPrefix is the prefix for common node IDs
39+
CommonNodeIDPrefix = "common:"
40+
// LingjunNodeIDPrefix is the prefix for lingjun node IDs
4041
LingjunNodeIDPrefix = "lingjun:"
4142

4243
// network types of CPFS mount targets
4344
networkTypeVPC = "vpc"
4445
networkTypeVSC = "vsc"
4546
)
4647

48+
// Driver represents the BMCPFS CSI driver
4749
type Driver struct {
4850
endpoint string
4951
servers common.Servers
5052
}
5153

54+
// NewDriver creates a new driver instance
5255
func NewDriver(meta *metadata.Metadata, endpoint string, serviceType utils.ServiceType) *Driver {
5356
var driver Driver
5457
driver.endpoint = endpoint
@@ -72,6 +75,7 @@ func NewDriver(meta *metadata.Metadata, endpoint string, serviceType utils.Servi
7275
return &driver
7376
}
7477

78+
// Run starts the driver
7579
func (d *Driver) Run() {
7680
klog.InfoS("Starting driver", "driver", driverType, "endpoint", d.endpoint)
7781
common.RunCSIServer(driverType, d.endpoint, d.servers)

pkg/bmcpfs/controllerserver.go

Lines changed: 98 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"os"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526

@@ -42,6 +43,7 @@ type controllerServer struct {
4243
common.GenericControllerServer
4344
vscManager *internal.PrimaryVscManagerWithCache
4445
attachDetacher internal.CPFSAttachDetacher
46+
filsetManager internal.CPFSFileSetManager
4547
nasClient *nasclient.Client
4648
}
4749

@@ -58,14 +60,98 @@ func newControllerServer(region string) (*controllerServer, error) {
5860
return &controllerServer{
5961
vscManager: internal.NewPrimaryVscManagerWithCache(efloClient),
6062
attachDetacher: internal.NewCPFSAttachDetacher(nasClient),
63+
filsetManager: internal.NewCPFSFileSetManager(nasClient),
6164
nasClient: nasClient,
6265
}, nil
6366
}
6467

68+
// CreateVolume ...
69+
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
70+
logger := klog.FromContext(ctx)
71+
logger.V(2).Info("starting")
72+
73+
// Handle static volume creation (existing fileset)
74+
csiVolume, err := staticFileSetCreate(req, cs.filsetManager)
75+
if err != nil {
76+
return nil, status.Errorf(codes.InvalidArgument, "create static volume failed: %v", err)
77+
}
78+
if csiVolume != nil {
79+
klog.Infof("CreateVolume: static volume create successful, pvName: %s, VolumeId: %s, volumeContext: %v", req.Name, csiVolume.VolumeId, csiVolume.VolumeContext)
80+
return &csi.CreateVolumeResponse{Volume: csiVolume}, nil
81+
}
82+
83+
// Validate parameters
84+
if err := validateFileSetParameters(req); err != nil {
85+
klog.Errorf("CreateVolume: error parameters from input: %v, with error: %v", req.Name, err)
86+
return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err)
87+
}
88+
89+
// Extract parameters
90+
params := req.GetParameters()
91+
bmcpfsID := params["bmcpfsId"]
92+
fullPath := req.Name
93+
if rootPath, ok := params["path"]; ok && rootPath != "" {
94+
fullPath = filepath.Join(rootPath, req.Name)
95+
}
96+
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
97+
98+
// Create fileset
99+
fileSetID, err := cs.filsetManager.CreateFileSet(ctx, bmcpfsID, req.Name, fullPath, 1000000, volSizeBytes, false)
100+
if err != nil {
101+
return nil, status.Error(codes.Internal, err.Error())
102+
}
103+
104+
// Prepare volume context
105+
volumeContext := req.GetParameters()
106+
if volumeContext == nil {
107+
volumeContext = make(map[string]string)
108+
}
109+
volumeContext = updateVolumeContext(volumeContext)
110+
111+
klog.Infof("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]", req.GetName(), fileSetID, bmcpfsID, fullPath)
112+
113+
tmpVol := createVolumeResponse(fileSetID, bmcpfsID, volSizeBytes, volumeContext)
114+
115+
return &csi.CreateVolumeResponse{Volume: tmpVol}, nil
116+
}
117+
118+
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
119+
logger := klog.FromContext(ctx)
120+
logger.V(2).Info("starting")
121+
122+
// Parse volume ID to extract filesystem ID and fileset ID
123+
fsID, fileSetID, err := parseVolumeID(req.VolumeId)
124+
if err != nil {
125+
klog.Errorf("DeleteVolume: failed to parse volume ID %s: %v", req.VolumeId, err)
126+
return nil, status.Error(codes.InvalidArgument, err.Error())
127+
}
128+
129+
klog.Infof("DeleteVolume: deleting fileset %s from filesystem %s", fileSetID, fsID)
130+
131+
// Check if fileset exists before attempting to delete
132+
_, err = cs.filsetManager.DescribeFileset(ctx, fsID, fileSetID)
133+
if err != nil {
134+
// If fileset doesn't exist, consider it already deleted
135+
klog.V(2).Infof("DeleteVolume: fileset %s not found in filesystem %s, considering it already deleted", fileSetID, fsID)
136+
return &csi.DeleteVolumeResponse{}, nil
137+
}
138+
139+
// Delete the fileset
140+
err = cs.filsetManager.DeleteFileSet(ctx, fsID, fileSetID)
141+
if err != nil {
142+
klog.Errorf("DeleteVolume: failed to delete fileset %s from filesystem %s: %v", fileSetID, fsID, err)
143+
return nil, status.Error(codes.Internal, err.Error())
144+
}
145+
146+
klog.Infof("DeleteVolume: successfully deleted fileset %s from filesystem %s", fileSetID, fsID)
147+
return &csi.DeleteVolumeResponse{}, nil
148+
}
149+
65150
func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
66151
return &csi.ControllerGetCapabilitiesResponse{
67152
Capabilities: common.ControllerRPCCapabilities(
68153
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
154+
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
69155
),
70156
}, nil
71157
}
@@ -96,18 +182,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
96182
}
97183

98184
// Get Primary vsc of Lingjun node
99-
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
185+
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
100186
if LingjunNodeIDPrefix == "" {
101187
return nil, status.Error(codes.InvalidArgument, "invalid node id")
102188
}
103-
vscId, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceId, false)
189+
vscID, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceID, false)
104190
if err != nil {
105191
return nil, status.Error(codes.Internal, err.Error())
106192
}
107-
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscId)
193+
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscID)
108194

109195
// Attach CPFS to VSC
110-
err = cs.attachDetacher.Attach(ctx, req.VolumeId, vscId)
196+
err = cs.attachDetacher.Attach(ctx, req.VolumeId, vscID)
111197
if err != nil {
112198
if autoSwitch, _ := strconv.ParseBool(req.VolumeContext[_mpAutoSwitch]); autoSwitch && internal.IsAttachNotSupportedError(err) {
113199
if req.VolumeContext[_vpcMountTarget] == "" {
@@ -125,11 +211,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
125211

126212
// TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node
127213

128-
klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscId, "node", req.NodeId)
214+
klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscID, "node", req.NodeId)
129215
return &csi.ControllerPublishVolumeResponse{
130216
PublishContext: map[string]string{
131217
_networkType: networkTypeVSC,
132-
_vscId: vscId,
218+
_vscID: vscID,
133219
_vscMountTarget: mt,
134220
},
135221
}, nil
@@ -143,11 +229,11 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
143229
}
144230

145231
// Create Primary vsc for Lingjun node
146-
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
232+
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
147233
if LingjunNodeIDPrefix == "" {
148234
return nil, status.Error(codes.InvalidArgument, "invalid node id")
149235
}
150-
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceId)
236+
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceID)
151237
if err != nil {
152238
return nil, status.Error(codes.Internal, err.Error())
153239
}
@@ -163,6 +249,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
163249
return &csi.ControllerUnpublishVolumeResponse{}, nil
164250
}
165251

252+
// KubernetesAlicloudIdentity is the user agent string for Eflo client
166253
var KubernetesAlicloudIdentity = fmt.Sprintf("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s", version.VERSION)
167254

168255
const efloConnTimeout = 10
@@ -209,9 +296,9 @@ func newEfloClient(region string) (*efloclient.Client, error) {
209296
return efloclient.NewClient(config)
210297
}
211298

212-
func getMountTarget(client *nasclient.Client, fsId, networkType string) (string, error) {
299+
func getMountTarget(client *nasclient.Client, fsID, networkType string) (string, error) {
213300
resp, err := client.DescribeFileSystems(&nasclient.DescribeFileSystemsRequest{
214-
FileSystemId: &fsId,
301+
FileSystemId: &fsID,
215302
})
216303
if err != nil {
217304
return "", fmt.Errorf("nas:DescribeFileSystems failed: %w", err)
@@ -234,7 +321,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
234321
if t == networkType {
235322
mountTarget := tea.StringValue(mt.MountTargetDomain)
236323
status := tea.StringValue(mt.Status)
237-
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsId, "networkType", networkType, "mountTarget", mountTarget, "status", status)
324+
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsID, "networkType", networkType, "mountTarget", mountTarget, "status", status)
238325
if status == "Active" {
239326
return mountTarget, nil
240327
}

0 commit comments

Comments
 (0)