@@ -20,6 +20,7 @@ import (
2020 "context"
2121 "fmt"
2222 "os"
23+ "path/filepath"
2324 "strconv"
2425 "strings"
2526
@@ -43,6 +44,7 @@ type controllerServer struct {
4344 common.GenericControllerServer
4445 vscManager * internal.PrimaryVscManagerWithCache
4546 attachDetacher internal.CPFSAttachDetacher
47+ filsetManager internal.CPFSFileSetManager
4648 nasClient * nasclient.Client
4749 skipDetach bool
4850}
@@ -62,18 +64,85 @@ func newControllerServer(region string) (*controllerServer, error) {
6264 if err != nil {
6365 return nil , err
6466 }
67+
6568 return & controllerServer {
6669 vscManager : internal .NewPrimaryVscManagerWithCache (efloClient ),
6770 attachDetacher : internal .NewCPFSAttachDetacher (nasClient ),
71+ filsetManager : internal .NewCPFSFileSetManager (nasClient ),
6872 nasClient : nasClient ,
6973 skipDetach : skipDetach ,
7074 }, nil
7175}
7276
77+ // CreateVolume ...
78+ func (cs * controllerServer ) CreateVolume (ctx context.Context , req * csi.CreateVolumeRequest ) (* csi.CreateVolumeResponse , error ) {
79+ logger := klog .FromContext (ctx )
80+ logger .V (2 ).Info ("starting" )
81+
82+ // Validate parameters
83+ if err := validateFileSetParameters (req ); err != nil {
84+ klog .Errorf ("CreateVolume: error parameters from input: %v, with error: %v" , req .Name , err )
85+ return nil , status .Errorf (codes .InvalidArgument , "Invalid parameters from input: %v, with error: %v" , req .Name , err )
86+ }
87+
88+ // Extract parameters
89+ params := req .GetParameters ()
90+ bmcpfsID := params ["bmcpfsId" ]
91+ fullPath := req .Name
92+ if rootPath , ok := params ["path" ]; ok && rootPath != "" {
93+ fullPath = filepath .Join (rootPath , req .Name )
94+ }
95+ volSizeBytes := int64 (req .GetCapacityRange ().GetRequiredBytes ())
96+
97+ // Create fileset
98+ fileSetID , err := cs .filsetManager .CreateFileSet (ctx , bmcpfsID , req .Name , fullPath , 1000000 , volSizeBytes , false )
99+ if err != nil {
100+ return nil , status .Error (codes .Internal , err .Error ())
101+ }
102+
103+ // Prepare volume context
104+ volumeContext := req .GetParameters ()
105+ if volumeContext == nil {
106+ volumeContext = make (map [string ]string )
107+ }
108+ volumeContext = updateVolumeContext (volumeContext )
109+
110+ klog .Infof ("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]" , req .GetName (), fileSetID , bmcpfsID , fullPath )
111+
112+ tmpVol := createVolumeResponse (fileSetID , bmcpfsID , volSizeBytes , volumeContext )
113+
114+ return & csi.CreateVolumeResponse {Volume : tmpVol }, nil
115+ }
116+
117+ func (cs * controllerServer ) DeleteVolume (ctx context.Context , req * csi.DeleteVolumeRequest ) (* csi.DeleteVolumeResponse , error ) {
118+ logger := klog .FromContext (ctx )
119+ logger .V (2 ).Info ("starting" )
120+
121+ // Parse volume ID to extract filesystem ID and fileset ID
122+ fsID , fileSetID , err := parseVolumeID (req .VolumeId )
123+ if err != nil {
124+ klog .Errorf ("DeleteVolume: failed to parse volume ID %s: %v" , req .VolumeId , err )
125+ return nil , status .Error (codes .InvalidArgument , err .Error ())
126+ }
127+
128+ klog .Infof ("DeleteVolume: deleting fileset %s from filesystem %s" , fileSetID , fsID )
129+
130+ // Delete the fileset
131+ err = cs .filsetManager .DeleteFileSet (ctx , fsID , fileSetID )
132+ if err != nil {
133+ klog .Errorf ("DeleteVolume: failed to delete fileset %s from filesystem %s: %v" , fileSetID , fsID , err )
134+ return nil , status .Error (codes .Internal , err .Error ())
135+ }
136+
137+ klog .Infof ("DeleteVolume: successfully deleted fileset %s from filesystem %s" , fileSetID , fsID )
138+ return & csi.DeleteVolumeResponse {}, nil
139+ }
140+
73141func (cs * controllerServer ) ControllerGetCapabilities (ctx context.Context , req * csi.ControllerGetCapabilitiesRequest ) (* csi.ControllerGetCapabilitiesResponse , error ) {
74142 return & csi.ControllerGetCapabilitiesResponse {
75143 Capabilities : common .ControllerRPCCapabilities (
76144 csi .ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME ,
145+ csi .ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME ,
77146 ),
78147 }, nil
79148}
@@ -94,7 +163,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
94163 }
95164
96165 cpfsID , _ := parseVolumeHandle (req .VolumeId )
97- // Get VscMountTarget of filesystem
166+
98167 mt := req .VolumeContext [_vscMountTarget ]
99168 if mt == "" {
100169 var err error
@@ -105,18 +174,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
105174 }
106175
107176 // Get Primary vsc of Lingjun node
108- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
177+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
109178 if LingjunNodeIDPrefix == "" {
110179 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
111180 }
112- vscId , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceId , false )
181+ vscID , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceID , false )
113182 if err != nil {
114183 return nil , status .Error (codes .Internal , err .Error ())
115184 }
116- klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscId )
185+ klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscID )
117186
118187 // Attach CPFS to VSC
119- err = cs .attachDetacher .Attach (ctx , cpfsID , vscId )
188+ err = cs .attachDetacher .Attach (ctx , cpfsID , vscID )
120189 if err != nil {
121190 if autoSwitch , _ := strconv .ParseBool (req .VolumeContext [_mpAutoSwitch ]); autoSwitch && internal .IsAttachNotSupportedError (err ) {
122191 if req .VolumeContext [_vpcMountTarget ] == "" {
@@ -135,35 +204,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
135204
136205 // TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node
137206
138- klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscId , "node" , req .NodeId )
207+ klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscID , "node" , req .NodeId )
139208 return & csi.ControllerPublishVolumeResponse {
140209 PublishContext : map [string ]string {
141210 _networkType : networkTypeVSC ,
142- _vscId : vscId ,
211+ _vscID : vscID ,
143212 _vscMountTarget : mt ,
144213 },
145214 }, nil
146215}
147216
148- func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (
149- * csi.ControllerUnpublishVolumeResponse , error ,
150- ) {
217+ func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
151218 if ! strings .HasPrefix (req .NodeId , LingjunNodeIDPrefix ) || cs .skipDetach {
152219 return & csi.ControllerUnpublishVolumeResponse {}, nil
153220 }
154221 // Create Primary vsc for Lingjun node
155- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
222+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
156223 if LingjunNodeIDPrefix == "" {
157224 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
158225 }
159- vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceId )
226+ vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceID )
160227 if err != nil {
161- return nil , status .Error (codes .Internal , err . Error () )
228+ return nil , status .Errorf (codes .Internal , "get vsc error: %v" , err )
162229 }
163230 if vsc == nil {
164231 klog .InfoS ("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found" , "node" , req .NodeId )
165232 return & csi.ControllerUnpublishVolumeResponse {}, nil
166233 }
234+
167235 // If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error.
168236 err = cs .attachDetacher .Detach (ctx , req .VolumeId , vsc .VscID )
169237 if err != nil {
@@ -173,6 +241,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
173241 return & csi.ControllerUnpublishVolumeResponse {}, nil
174242}
175243
244+ // KubernetesAlicloudIdentity is the user agent string for Eflo client
176245var KubernetesAlicloudIdentity = fmt .Sprintf ("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s" , version .VERSION )
177246
178247const efloConnTimeout = 10
@@ -227,9 +296,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) {
227296 return parts [0 ], ""
228297}
229298
230- func getMountTarget (client * nasclient.Client , fsId , networkType string ) (string , error ) {
299+ func getMountTarget (client * nasclient.Client , fsID , networkType string ) (string , error ) {
231300 resp , err := client .DescribeFileSystems (& nasclient.DescribeFileSystemsRequest {
232- FileSystemId : & fsId ,
301+ FileSystemId : & fsID ,
233302 })
234303 if err != nil {
235304 return "" , fmt .Errorf ("nas:DescribeFileSystems failed: %w" , err )
@@ -252,7 +321,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
252321 if t == networkType {
253322 mountTarget := tea .StringValue (mt .MountTargetDomain )
254323 status := tea .StringValue (mt .Status )
255- klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsId , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
324+ klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsID , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
256325 if status == "Active" {
257326 return mountTarget , nil
258327 }
0 commit comments