diff --git a/.golangci.yaml b/.golangci.yaml index 207ee9125..7783d570e 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -23,6 +23,11 @@ issues: - path: .*\.go linters: - unused + # Temporary: suppress deprecated Endpoints API warning until dadi migrates to EndpointSlice. + - path: pkg/dadi/.*\.go + text: corev1\.Endpoints is deprecated + linters: + - staticcheck # Mode of the generated files analysis. # diff --git a/_typos.toml b/_typos.toml index e60ebd8a7..cf096ee99 100644 --- a/_typos.toml +++ b/_typos.toml @@ -10,3 +10,5 @@ teh = "teh" ded = "ded" eles = "eles" TAGED = "TAGED" +Entrie = "Entrie" +DescribeFilesetsResponseBodyEntriesEntrie = "DescribeFilesetsResponseBodyEntriesEntrie" \ No newline at end of file diff --git a/deploy/charts/alibaba-cloud-csi-driver/templates/controller.yaml b/deploy/charts/alibaba-cloud-csi-driver/templates/controller.yaml index 093efcb8c..4a9d96614 100644 --- a/deploy/charts/alibaba-cloud-csi-driver/templates/controller.yaml +++ b/deploy/charts/alibaba-cloud-csi-driver/templates/controller.yaml @@ -358,6 +358,29 @@ spec: mountPath: /csi {{- end -}} {{- if and .Values.csi.bmcpfs.enabled .Values.csi.bmcpfs.controller.enabled }} + - name: external-bmcpfs-provisioner + image: {{ include "imageSpec" (list .Values "externalProvisioner") }} + args: + - --csi-address=/csi/csi.sock + - --http-endpoint=:8100 + - --volume-name-prefix=bmcpfs + - --default-fstype=bmcpfs + - --timeout=150s + - --leader-election=true + - --retry-interval-start=500ms + - --kube-api-qps=100 + - --kube-api-burst=200 + - --v=5 + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 500m + memory: 1024Mi + volumeMounts: + - name: bmcpfs-provisioner-dir + mountPath: /csi - name: external-bmcpfs-attacher image: {{ include "imageSpec" (list .Values "externalAttacher") }} resources: diff --git a/pkg/bmcpfs/bmcpfs.go b/pkg/bmcpfs/bmcpfs.go index 01937cfac..fb459268d 100644 --- a/pkg/bmcpfs/bmcpfs.go +++ b/pkg/bmcpfs/bmcpfs.go @@ -30,13 +30,14 @@ const ( // keys in volume context or publish context _vpcMountTarget = "vpcMountTarget" _vscMountTarget = "vscMountTarget" - _vscId = "vscId" + _vscID = "vscId" _networkType = "networkType" _path = "path" _mpAutoSwitch = "mountpointAutoSwitch" - // prefix of node id - CommonNodeIDPrefix = "common:" + // CommonNodeIDPrefix is the prefix for common node IDs + CommonNodeIDPrefix = "common:" + // LingjunNodeIDPrefix is the prefix for lingjun node IDs LingjunNodeIDPrefix = "lingjun:" // network types of CPFS mount targets diff --git a/pkg/bmcpfs/controllerserver.go b/pkg/bmcpfs/controllerserver.go index 3b81537d7..b5df28b88 100644 --- a/pkg/bmcpfs/controllerserver.go +++ b/pkg/bmcpfs/controllerserver.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strconv" "strings" @@ -43,6 +44,7 @@ type controllerServer struct { common.GenericControllerServer vscManager *internal.PrimaryVscManagerWithCache attachDetacher internal.CPFSAttachDetacher + filsetManager internal.CPFSFileSetManager nasClient *nasclient.Client skipDetach bool } @@ -62,18 +64,85 @@ func newControllerServer(region string) (*controllerServer, error) { if err != nil { return nil, err } + return &controllerServer{ vscManager: internal.NewPrimaryVscManagerWithCache(efloClient), attachDetacher: internal.NewCPFSAttachDetacher(nasClient), + filsetManager: internal.NewCPFSFileSetManager(nasClient), nasClient: nasClient, skipDetach: skipDetach, }, nil } +// CreateVolume ... +func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + logger := klog.FromContext(ctx) + logger.V(2).Info("starting") + + // Validate parameters + if err := validateFileSetParameters(req); err != nil { + klog.Errorf("CreateVolume: error parameters from input: %v, with error: %v", req.Name, err) + return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err) + } + + // Extract parameters + params := req.GetParameters() + bmcpfsID := params["bmcpfsId"] + fullPath := req.Name + if rootPath, ok := params["path"]; ok && rootPath != "" { + fullPath = filepath.Join(rootPath, req.Name) + } + volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) + + // Create fileset + fileSetID, err := cs.filsetManager.CreateFileSet(ctx, bmcpfsID, req.Name, fullPath, 1000000, volSizeBytes, false) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + // Prepare volume context + volumeContext := req.GetParameters() + if volumeContext == nil { + volumeContext = make(map[string]string) + } + volumeContext = updateVolumeContext(volumeContext) + + klog.Infof("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]", req.GetName(), fileSetID, bmcpfsID, fullPath) + + tmpVol := createVolumeResponse(fileSetID, bmcpfsID, volSizeBytes, volumeContext) + + return &csi.CreateVolumeResponse{Volume: tmpVol}, nil +} + +func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + logger := klog.FromContext(ctx) + logger.V(2).Info("starting") + + // Parse volume ID to extract filesystem ID and fileset ID + fsID, fileSetID, err := parseVolumeID(req.VolumeId) + if err != nil { + klog.Errorf("DeleteVolume: failed to parse volume ID %s: %v", req.VolumeId, err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + klog.Infof("DeleteVolume: deleting fileset %s from filesystem %s", fileSetID, fsID) + + // Delete the fileset + err = cs.filsetManager.DeleteFileSet(ctx, fsID, fileSetID) + if err != nil { + klog.Errorf("DeleteVolume: failed to delete fileset %s from filesystem %s: %v", fileSetID, fsID, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("DeleteVolume: successfully deleted fileset %s from filesystem %s", fileSetID, fsID) + return &csi.DeleteVolumeResponse{}, nil +} + func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { return &csi.ControllerGetCapabilitiesResponse{ Capabilities: common.ControllerRPCCapabilities( csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, ), }, nil } @@ -94,7 +163,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs } cpfsID, _ := parseVolumeHandle(req.VolumeId) - // Get VscMountTarget of filesystem + mt := req.VolumeContext[_vscMountTarget] if mt == "" { var err error @@ -105,18 +174,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs } // Get Primary vsc of Lingjun node - lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix) + lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix) if LingjunNodeIDPrefix == "" { return nil, status.Error(codes.InvalidArgument, "invalid node id") } - vscId, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceId, false) + vscID, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceID, false) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscId) + klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscID) // Attach CPFS to VSC - err = cs.attachDetacher.Attach(ctx, cpfsID, vscId) + err = cs.attachDetacher.Attach(ctx, cpfsID, vscID) if err != nil { if autoSwitch, _ := strconv.ParseBool(req.VolumeContext[_mpAutoSwitch]); autoSwitch && internal.IsAttachNotSupportedError(err) { if req.VolumeContext[_vpcMountTarget] == "" { @@ -135,35 +204,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs // TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node - klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscId, "node", req.NodeId) + klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscID, "node", req.NodeId) return &csi.ControllerPublishVolumeResponse{ PublishContext: map[string]string{ _networkType: networkTypeVSC, - _vscId: vscId, + _vscID: vscID, _vscMountTarget: mt, }, }, nil } -func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) ( - *csi.ControllerUnpublishVolumeResponse, error, -) { +func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { if !strings.HasPrefix(req.NodeId, LingjunNodeIDPrefix) || cs.skipDetach { return &csi.ControllerUnpublishVolumeResponse{}, nil } // Create Primary vsc for Lingjun node - lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix) + lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix) if LingjunNodeIDPrefix == "" { return nil, status.Error(codes.InvalidArgument, "invalid node id") } - vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceId) + vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceID) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Errorf(codes.Internal, "get vsc error: %v", err) } if vsc == nil { klog.InfoS("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found", "node", req.NodeId) return &csi.ControllerUnpublishVolumeResponse{}, nil } + // If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error. err = cs.attachDetacher.Detach(ctx, req.VolumeId, vsc.VscID) if err != nil { @@ -173,6 +241,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return &csi.ControllerUnpublishVolumeResponse{}, nil } +// KubernetesAlicloudIdentity is the user agent string for Eflo client var KubernetesAlicloudIdentity = fmt.Sprintf("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s", version.VERSION) const efloConnTimeout = 10 @@ -227,9 +296,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) { return parts[0], "" } -func getMountTarget(client *nasclient.Client, fsId, networkType string) (string, error) { +func getMountTarget(client *nasclient.Client, fsID, networkType string) (string, error) { resp, err := client.DescribeFileSystems(&nasclient.DescribeFileSystemsRequest{ - FileSystemId: &fsId, + FileSystemId: &fsID, }) if err != nil { return "", fmt.Errorf("nas:DescribeFileSystems failed: %w", err) @@ -252,7 +321,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string, if t == networkType { mountTarget := tea.StringValue(mt.MountTargetDomain) status := tea.StringValue(mt.Status) - klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsId, "networkType", networkType, "mountTarget", mountTarget, "status", status) + klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsID, "networkType", networkType, "mountTarget", mountTarget, "status", status) if status == "Active" { return mountTarget, nil } diff --git a/pkg/bmcpfs/controllerserver_test.go b/pkg/bmcpfs/controllerserver_test.go index e3563ed58..a440e564b 100644 --- a/pkg/bmcpfs/controllerserver_test.go +++ b/pkg/bmcpfs/controllerserver_test.go @@ -3,7 +3,7 @@ Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at +You you may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 @@ -17,8 +17,16 @@ limitations under the License. package bmcpfs import ( + "fmt" "testing" + "context" + "errors" + + nasclient "github.com/alibabacloud-go/nas-20170626/v4/client" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/mock/gomock" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/bmcpfs/internal" "github.com/stretchr/testify/assert" ) @@ -27,34 +35,247 @@ func TestParseVolumeHandle(t *testing.T) { tests := []struct { name string volumeHandle string - expectedFsId string - expectedFsetId string + expectedFsID string + expectedFsetID string }{ { name: "single part", volumeHandle: "fs-12345", - expectedFsId: "fs-12345", - expectedFsetId: "", + expectedFsID: "fs-12345", + expectedFsetID: "", }, { name: "two parts", volumeHandle: "fs-12345+fset-67890", - expectedFsId: "fs-12345", - expectedFsetId: "fset-67890", + expectedFsID: "fs-12345", + expectedFsetID: "fset-67890", }, { name: "multiple delimiters", volumeHandle: "fs-12345+fset-67890+extra", - expectedFsId: "fs-12345", - expectedFsetId: "", + expectedFsID: "fs-12345", + expectedFsetID: "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fsId, fsetId := parseVolumeHandle(tt.volumeHandle) - assert.Equal(t, tt.expectedFsId, fsId) - assert.Equal(t, tt.expectedFsetId, fsetId) + fsID, fsetID := parseVolumeHandle(tt.volumeHandle) + assert.Equal(t, tt.expectedFsID, fsID) + assert.Equal(t, tt.expectedFsetID, fsetID) + }) + } +} + +func TestControllerServer_CreateVolume(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNas := internal.NewGoMockNasClient(ctrl) + + // Mock the CreateFileset method + mockNas.EXPECT(). + CreateFileset(gomock.Any()).AnyTimes(). + DoAndReturn(func(req *nasclient.CreateFilesetRequest) (*nasclient.CreateFilesetResponse, error) { + if *req.FileSystemId == "" { + return nil, errors.New("file system ID is required") + } + if *req.FileSystemPath == "" { + return nil, errors.New("file system path is required") + } + if *req.FileSystemId == "valid_fs" { + return &nasclient.CreateFilesetResponse{ + Body: &nasclient.CreateFilesetResponseBody{ + FsetId: req.ClientToken, + }, + }, nil + } + if *req.FileSystemId == "create_failed" { + return nil, errors.New("create failed") + } + return nil, nil + }) + + // Create fileset manager + fsm := internal.NewCPFSFileSetManager(mockNas) + ctx := context.Background() + + // Test cases for fileset manager + tests := []struct { + name string + fsID string + pvName string + fileSystemPath string + expectError string + }{ + { + name: "filesystem empty", + fsID: "", + pvName: "test-pv", + fileSystemPath: "/test-path", + expectError: fmt.Sprintf("create fileset %s/%s failed: %s", "", "test-pv", "file system ID is required"), + }, + { + name: "normal create", + fsID: "valid_fs", + pvName: "test-pv", + fileSystemPath: "/test-path", + expectError: "", + }, + { + name: "create failed", + fsID: "create_failed", + pvName: "test-pv", + fileSystemPath: "/test-path", + expectError: fmt.Sprintf("create fileset %s/%s failed: %s", "create_failed", "test-pv", "create failed"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := fsm.CreateFileSet(ctx, tt.fsID, tt.pvName, tt.fileSystemPath, 1000000, 1024*1024*1024, false) + if err != nil { + assert.Equal(t, tt.expectError, err.Error()) + } + }) + } + + // Test cases for controller server CreateVolume + controllerTests := []struct { + name string + request *csi.CreateVolumeRequest + expectError string + }{ + { + name: "missing bmcpfsId parameter", + request: &csi.CreateVolumeRequest{ + Name: "test-volume", + Parameters: map[string]string{ + "path": "/test", + }, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + FsType: "bmcpfs", + }, + }, + }, + }, + }, + expectError: "Invalid parameters from input: test-volume, with error: bmcpfsId parameter is required", + }, + { + name: "valid request", + request: &csi.CreateVolumeRequest{ + Name: "test-volume", + Parameters: map[string]string{ + "bmcpfsId": "valid_fs", + "path": "/test", + }, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + FsType: "bmcpfs", + }, + }, + }, + }, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: 1024 * 1024 * 1024, + }, + }, + expectError: "", + }, + } + + // Create a mock controller server + cs := &controllerServer{ + filsetManager: fsm, + } + + for _, tt := range controllerTests { + t.Run(tt.name, func(t *testing.T) { + resp, err := cs.CreateVolume(ctx, tt.request) + if tt.expectError != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectError) + } else { + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, "valid_fs", resp.Volume.VolumeId) + } }) } } + +func TestControllerServer_DeleteVolume(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNas := internal.NewGoMockNasClient(ctrl) + + mockNas.EXPECT(). + DeleteFileset(gomock.Any()).AnyTimes(). + DoAndReturn(func(req *nasclient.DeleteFilesetRequest) (*nasclient.DeleteFilesetResponse, error) { + if *req.FileSystemId == "" { + return nil, errors.New("file system ID is required") + } + if *req.FsetId == "" { + return nil, errors.New("file set ID is required") + } + if *req.FileSystemId == "normal_delete" { + return &nasclient.DeleteFilesetResponse{ + Body: &nasclient.DeleteFilesetResponseBody{}, + }, nil + } + if *req.FileSystemId == "delete_failed" { + return nil, errors.New("delete failed") + } + return nil, nil + }) + + fsm := internal.NewCPFSFileSetManager(mockNas) + ctx := context.Background() + + tests := []struct { + name string + fsID string + filesetID string + expectError string + }{ + { + name: "filesystem empty", + fsID: "", + filesetID: "", + expectError: fmt.Sprintf("delete fileset %s/%s failed: %s", "", "", "file system ID is required"), + }, + { + name: "fileset empty", + fsID: "xxx", + filesetID: "", + expectError: fmt.Sprintf("delete fileset %s/%s failed: %s", "xxx", "", "file set ID is required"), + }, + { + name: "normal delete", + fsID: "normal_delete", + filesetID: "normal_delete", + expectError: "", + }, + { + name: "delete_failed", + fsID: "delete_failed", + filesetID: "delete_failed", + expectError: fmt.Sprintf("delete fileset %s/%s failed: %s", "delete_failed", "delete_failed", "delete failed"), + }, + } + + for _, tt := range tests { + err := fsm.DeleteFileSet(ctx, tt.fsID, tt.filesetID) + if err != nil { + assert.Equal(t, tt.expectError, err.Error()) + } + } + +} diff --git a/pkg/bmcpfs/internal/fileset.go b/pkg/bmcpfs/internal/fileset.go new file mode 100644 index 000000000..45d3c261c --- /dev/null +++ b/pkg/bmcpfs/internal/fileset.go @@ -0,0 +1,172 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "errors" + "fmt" + + nasclient "github.com/alibabacloud-go/nas-20170626/v4/client" + "github.com/alibabacloud-go/tea/tea" + "k8s.io/klog/v2" +) + +// CPFSFileSetManager is the interface for managing CPFS filesets +type CPFSFileSetManager interface { + CreateFileSet(ctx context.Context, fsID, pvName, fileSystemPath string, countLimit, sizeBytes int64, deleteProtection bool) (string, error) + DeleteFileSet(ctx context.Context, fsID, fileSetID string) error + DescribeFileSets(ctx context.Context, fsID string) ([]*nasclient.DescribeFilesetsResponseBodyEntriesEntrie, error) + DescribeFileset(ctx context.Context, fsID, fileSetID string) (*nasclient.DescribeFilesetsResponseBodyEntriesEntrie, error) +} + +type NasClient interface { + CreateFileset(request *nasclient.CreateFilesetRequest) (*nasclient.CreateFilesetResponse, error) + DeleteFileset(request *nasclient.DeleteFilesetRequest) (*nasclient.DeleteFilesetResponse, error) + DescribeFilesets(request *nasclient.DescribeFilesetsRequest) (*nasclient.DescribeFilesetsResponse, error) +} + +type cpfsFileSetManager struct { + client NasClient +} + +// ErrFilesetNotFound signals that the requested fileset does not exist. +var ErrFilesetNotFound = errors.New("fileset not found") + +// NewCPFSFileSetManager creates a new CPFS fileset manager +func NewCPFSFileSetManager(client NasClient) CPFSFileSetManager { + return &cpfsFileSetManager{ + client: client, + } +} + +func (m *cpfsFileSetManager) CreateFileSet(ctx context.Context, fsID, pvName, fileSystemPath string, countLimit, sizeBytes int64, deleteProtection bool) (string, error) { + quota := &nasclient.CreateFilesetRequestQuota{ + SizeLimit: tea.Int64(sizeBytes), + } + if countLimit != 0 { + quota.FileCountLimit = tea.Int64(countLimit) + } + request := &nasclient.CreateFilesetRequest{ + FileSystemId: tea.String(fsID), + ClientToken: tea.String(pvName), + DeletionProtection: tea.Bool(deleteProtection), + FileSystemPath: tea.String(fileSystemPath), + Quota: quota, + } + response, err := m.client.CreateFileset(request) + if err != nil { + return "", fmt.Errorf("create fileset %s/%s failed: %v", fsID, pvName, err) + } + if response == nil || response.Body == nil || response.Body.FsetId == nil { + return "", fmt.Errorf("create fileset %s/%s failed: empty response", fsID, fileSystemPath) + } + klog.V(4).Infof("create fileset %s/%s successfully", fsID, fileSystemPath) + return *response.Body.FsetId, nil +} + +// DeleteFileSet deletes a fileset by fileSystemId and fileSetId +func (m *cpfsFileSetManager) DeleteFileSet(ctx context.Context, fsID, fileSetID string) error { + request := &nasclient.DeleteFilesetRequest{ + FileSystemId: tea.String(fsID), + FsetId: tea.String(fileSetID), + } + + response, err := m.client.DeleteFileset(request) + if err != nil { + return fmt.Errorf("delete fileset %s/%s failed: %v", fsID, fileSetID, err) + } + + if response == nil || response.Body == nil { + return fmt.Errorf("delete fileset %s/%s failed: empty response", fsID, fileSetID) + } + + klog.V(4).Infof("delete fileset %s/%s successfully", fsID, fileSetID) + return nil +} + +// DescribeFileSets describes all filesets in a filesystem by fileSystemId +func (m *cpfsFileSetManager) DescribeFileSets(ctx context.Context, fsID string) ([]*nasclient.DescribeFilesetsResponseBodyEntriesEntrie, error) { + var allFileSets []*nasclient.DescribeFilesetsResponseBodyEntriesEntrie + var nextToken *string + + // Handle pagination - iterate through all pages to collect all filesets + for { + request := &nasclient.DescribeFilesetsRequest{ + FileSystemId: tea.String(fsID), + NextToken: nextToken, + } + + response, err := m.client.DescribeFilesets(request) + if err != nil { + return nil, fmt.Errorf("describe filesets for filesystem %s failed: %v", fsID, err) + } + + if response == nil || response.Body == nil { + return nil, fmt.Errorf("describe filesets for filesystem %s failed: empty response", fsID) + } + + // Collect filesets from current page + if response.Body.Entries != nil && len(response.Body.Entries.Entrie) > 0 { + allFileSets = append(allFileSets, response.Body.Entries.Entrie...) + } + + // Check if there are more pages + if response.Body.NextToken == nil || *response.Body.NextToken == "" { + // No more pages, break the loop + break + } + + // Move to the next page + nextToken = response.Body.NextToken + } + + klog.V(4).Infof("describe filesets for filesystem %s successfully, total count: %d", fsID, len(allFileSets)) + return allFileSets, nil +} + +// DescribeFileset describes a specific fileset by fileSystemId and fileSetId +func (m *cpfsFileSetManager) DescribeFileset(ctx context.Context, fsID, fileSetID string) (*nasclient.DescribeFilesetsResponseBodyEntriesEntrie, error) { + request := &nasclient.DescribeFilesetsRequest{ + FileSystemId: tea.String(fsID), + Filters: []*nasclient.DescribeFilesetsRequestFilters{ + { + Key: tea.String("FsetIds"), + Value: tea.String(fileSetID), + }, + }, + } + + response, err := m.client.DescribeFilesets(request) + if err != nil { + return nil, fmt.Errorf("describe fileset %s/%s failed: %v", fsID, fileSetID, err) + } + + if response == nil || response.Body == nil { + return nil, fmt.Errorf("describe fileset %s/%s failed: empty response", fsID, fileSetID) + } + + // Check if fileset exists in the response + if response.Body.Entries == nil || len(response.Body.Entries.Entrie) == 0 { + return nil, fmt.Errorf("fileset %s not found in filesystem %s: %w", fileSetID, fsID, ErrFilesetNotFound) + } + + // Return the first (and should be only) fileset + fileset := response.Body.Entries.Entrie[0] + klog.V(4).Infof("describe fileset %s/%s successfully", fsID, fileSetID) + return fileset, nil +} diff --git a/pkg/bmcpfs/internal/fileset_gomock_test.go b/pkg/bmcpfs/internal/fileset_gomock_test.go new file mode 100644 index 000000000..2aff63c0b --- /dev/null +++ b/pkg/bmcpfs/internal/fileset_gomock_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "errors" + "testing" + + nasclient "github.com/alibabacloud-go/nas-20170626/v4/client" + "github.com/alibabacloud-go/tea/tea" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCPFSFileSetManager_CreateFileSet_WithGoMock(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNas := NewGoMockNasClient(ctrl) + manager := NewCPFSFileSetManager(mockNas) + + fsID := "fs-123" + pvName := "pv-demo" + fileSystemPath := "/demo/path" + countLimit := int64(50) + sizeBytes := int64(2048) + deleteProtection := true + expectedFileSetID := "fset-abc" + + mockNas.EXPECT(). + CreateFileset(gomock.Any()). + DoAndReturn(func(req *nasclient.CreateFilesetRequest) (*nasclient.CreateFilesetResponse, error) { + require.Equal(t, fsID, tea.StringValue(req.FileSystemId)) + require.Equal(t, pvName, tea.StringValue(req.ClientToken)) + require.Equal(t, fileSystemPath, tea.StringValue(req.FileSystemPath)) + require.Equal(t, countLimit, tea.Int64Value(req.Quota.FileCountLimit)) + require.Equal(t, sizeBytes, tea.Int64Value(req.Quota.SizeLimit)) + require.Equal(t, deleteProtection, tea.BoolValue(req.DeletionProtection)) + return &nasclient.CreateFilesetResponse{ + Body: &nasclient.CreateFilesetResponseBody{ + FsetId: tea.String(expectedFileSetID), + }, + }, nil + }) + + fileSetID, err := manager.CreateFileSet(context.Background(), fsID, pvName, fileSystemPath, countLimit, sizeBytes, deleteProtection) + require.NoError(t, err) + assert.Equal(t, expectedFileSetID, fileSetID) +} + +func TestCPFSFileSetManager_DeleteFileSet_WithGoMock(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNas := NewGoMockNasClient(ctrl) + manager := NewCPFSFileSetManager(mockNas) + + fsID := "fs-123" + fileSetID := "fset-delete" + + mockNas.EXPECT(). + DeleteFileset(gomock.Any()). + DoAndReturn(func(req *nasclient.DeleteFilesetRequest) (*nasclient.DeleteFilesetResponse, error) { + require.Equal(t, fsID, tea.StringValue(req.FileSystemId)) + require.Equal(t, fileSetID, tea.StringValue(req.FsetId)) + return &nasclient.DeleteFilesetResponse{ + Body: &nasclient.DeleteFilesetResponseBody{}, + }, nil + }) + + err := manager.DeleteFileSet(context.Background(), fsID, fileSetID) + require.NoError(t, err) +} + +func TestCPFSFileSetManager_DescribeFileset_ErrorWithGoMock(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNas := NewGoMockNasClient(ctrl) + manager := NewCPFSFileSetManager(mockNas) + + fsID := "fs-123" + fileSetID := "fset-789" + expectedErr := errors.New("describe failed") + + mockNas.EXPECT(). + DescribeFilesets(gomock.Any()). + DoAndReturn(func(req *nasclient.DescribeFilesetsRequest) (*nasclient.DescribeFilesetsResponse, error) { + require.Equal(t, fsID, tea.StringValue(req.FileSystemId)) + require.Len(t, req.Filters, 1) + require.Equal(t, fileSetID, tea.StringValue(req.Filters[0].Value)) + return nil, expectedErr + }) + + fileset, err := manager.DescribeFileset(context.Background(), fsID, fileSetID) + require.Error(t, err) + assert.Nil(t, fileset) + assert.Contains(t, err.Error(), expectedErr.Error()) +} diff --git a/pkg/bmcpfs/internal/mock_nas_client_gomock.go b/pkg/bmcpfs/internal/mock_nas_client_gomock.go new file mode 100644 index 000000000..814eba1b0 --- /dev/null +++ b/pkg/bmcpfs/internal/mock_nas_client_gomock.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/bmcpfs/internal/fileset.go + +// Package internal is a generated GoMock package. +package internal + +import ( + "reflect" + + client "github.com/alibabacloud-go/nas-20170626/v4/client" + gomock "github.com/golang/mock/gomock" +) + +// GoMockNasClient is a mock of NasClient interface. +type GoMockNasClient struct { + ctrl *gomock.Controller + recorder *GoMockNasClientMockRecorder +} + +// GoMockNasClientMockRecorder is the mock recorder for GoMockNasClient. +type GoMockNasClientMockRecorder struct { + mock *GoMockNasClient +} + +// NewGoMockNasClient creates a new mock instance. +func NewGoMockNasClient(ctrl *gomock.Controller) *GoMockNasClient { + mock := &GoMockNasClient{ctrl: ctrl} + mock.recorder = &GoMockNasClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *GoMockNasClient) EXPECT() *GoMockNasClientMockRecorder { + return m.recorder +} + +// CreateFileset mocks base method. +func (m *GoMockNasClient) CreateFileset(request *client.CreateFilesetRequest) (*client.CreateFilesetResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateFileset", request) + ret0, _ := ret[0].(*client.CreateFilesetResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateFileset indicates an expected call of CreateFileset. +func (mr *GoMockNasClientMockRecorder) CreateFileset(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFileset", reflect.TypeOf((*GoMockNasClient)(nil).CreateFileset), request) +} + +// DeleteFileset mocks base method. +func (m *GoMockNasClient) DeleteFileset(request *client.DeleteFilesetRequest) (*client.DeleteFilesetResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteFileset", request) + ret0, _ := ret[0].(*client.DeleteFilesetResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteFileset indicates an expected call of DeleteFileset. +func (mr *GoMockNasClientMockRecorder) DeleteFileset(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteFileset", reflect.TypeOf((*GoMockNasClient)(nil).DeleteFileset), request) +} + +// DescribeFilesets mocks base method. +func (m *GoMockNasClient) DescribeFilesets(request *client.DescribeFilesetsRequest) (*client.DescribeFilesetsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeFilesets", request) + ret0, _ := ret[0].(*client.DescribeFilesetsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeFilesets indicates an expected call of DescribeFilesets. +func (mr *GoMockNasClientMockRecorder) DescribeFilesets(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeFilesets", reflect.TypeOf((*GoMockNasClient)(nil).DescribeFilesets), request) +} diff --git a/pkg/bmcpfs/nodeserver.go b/pkg/bmcpfs/nodeserver.go index 43f197128..4501bb4d2 100644 --- a/pkg/bmcpfs/nodeserver.go +++ b/pkg/bmcpfs/nodeserver.go @@ -71,7 +71,6 @@ func newNodeServer() (*nodeServer, error) { mounter := mounter.NewProxyMounter(defaultAlinasMountProxySocket, mount.NewWithoutSystemd("")) return &nodeServer{ GenericNodeServer: common.GenericNodeServer{NodeID: nodeID}, - locks: utils.NewVolumeLocks(), mounter: mounter, }, nil } diff --git a/pkg/bmcpfs/utils.go b/pkg/bmcpfs/utils.go new file mode 100644 index 000000000..b003ec522 --- /dev/null +++ b/pkg/bmcpfs/utils.go @@ -0,0 +1,128 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bmcpfs + +import ( + "fmt" + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" + "k8s.io/klog/v2" +) + +const ( + // keys for static volume support + annFileSetID = "csi.alibabacloud.com/fileset-id" + + // keys for volume context management + labelAppendPrefix = "csi.alibabacloud.com/" + annAppendPrefix = "csi.alibabacloud.com/" + + // TopologyRegionKey is the key for region topology + TopologyRegionKey = "region" + // TopologyZoneKey is the key for zone topology + TopologyZoneKey = "zone" + + // volume context keys + labelVolumeType = "volume-type" + annVolumeTopoKey = "volume-topology" +) + +// updateVolumeContext removes unnecessary volume context parameters +func updateVolumeContext(volumeContext map[string]string) map[string]string { + cloned := make(map[string]string) + for k, v := range volumeContext { + cloned[k] = v + } + + // Remove unnecessary keys similar to disk implementation + keysToRemove := []string{ + "csi.alibabacloud.com/lastApply", + common.PVNameKey, + common.PVCNameKey, + common.PVCNamespaceKey, + "csi.alibabacloud.com/storage-provisioner", + "csi.alibabacloud.com/reclaimPolicy", + "csi.alibabacloud.com/storageclassName", + "allowVolumeExpansion", + "volume.kubernetes.io/selected-node", + } + + for _, key := range keysToRemove { + delete(cloned, key) + } + + return cloned +} + +// parseVolumeID parses the volume ID to extract filesystem ID and fileset ID +// VolumeId format: "filesystemId+filesetId" +func parseVolumeID(volumeID string) (filesystemID, filesetID string, err error) { + parts := strings.Split(volumeID, "+") + if len(parts) != 2 { + return "", "", fmt.Errorf("invalid volume ID format: %s, expected format: filesystemId+filesetId", volumeID) + } + + filesystemID = parts[0] + filesetID = parts[1] + + if filesystemID == "" || filesetID == "" { + return "", "", fmt.Errorf("invalid volume ID: filesystem ID or fileset ID is empty") + } + + return filesystemID, filesetID, nil +} + +// validateFileSetParameters validates the parameters for fileset creation +func validateFileSetParameters(req *csi.CreateVolumeRequest) error { + params := req.GetParameters() + + // Check required parameters + bmcpfsID := params["bmcpfsId"] + if bmcpfsID == "" { + return fmt.Errorf("bmcpfsId parameter is required") + } + + // Validate filesystem type if provided + for _, cap := range req.GetVolumeCapabilities() { + mnt := cap.GetMount() + if mnt != nil && mnt.FsType != "" && mnt.FsType != "bmcpfs" { + return fmt.Errorf("fsType %s is not supported, only 'cpfs' is supported", mnt.FsType) + } + } + + return nil +} + +// createVolumeResponse creates a proper CSI volume response +func createVolumeResponse(fileSetID, bmcpfsID string, volSizeBytes int64, volumeContext map[string]string) *csi.Volume { + // Add volume type to context + if volumeContext == nil { + volumeContext = make(map[string]string) + } + volumeContext[labelAppendPrefix+labelVolumeType] = "bmcpfs" + volumeContext[annFileSetID] = fileSetID + + klog.Infof("createVolumeResponse: volumeContext: %+v", volumeContext) + + return &csi.Volume{ + CapacityBytes: volSizeBytes, + VolumeId: bmcpfsID, + VolumeContext: volumeContext, + } +} diff --git a/pkg/bmcpfs/utils_test.go b/pkg/bmcpfs/utils_test.go new file mode 100644 index 000000000..600a2da51 --- /dev/null +++ b/pkg/bmcpfs/utils_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bmcpfs + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseVolumeID_Success(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("fs-123+fset-456") + + require.NoError(t, err) + assert.Equal(t, "fs-123", fsID) + assert.Equal(t, "fset-456", fileSetID) +} + +func TestParseVolumeID_InvalidFormat_NoSeparator(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("fs-123fset-456") + + require.Error(t, err) + assert.Empty(t, fsID) + assert.Empty(t, fileSetID) + assert.Contains(t, err.Error(), "invalid volume ID format") +} + +func TestParseVolumeID_InvalidFormat_MultipleSeparators(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("fs-123+fset-456+extra") + + require.Error(t, err) + assert.Empty(t, fsID) + assert.Empty(t, fileSetID) + assert.Contains(t, err.Error(), "invalid volume ID format") +} + +func TestParseVolumeID_EmptyFilesystemID(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("+fset-456") + + require.Error(t, err) + assert.Empty(t, fsID) + assert.Empty(t, fileSetID) + assert.Contains(t, err.Error(), "filesystem ID or fileset ID is empty") +} + +func TestParseVolumeID_EmptyFilesetID(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("fs-123+") + + require.Error(t, err) + assert.Empty(t, fsID) + assert.Empty(t, fileSetID) + assert.Contains(t, err.Error(), "filesystem ID or fileset ID is empty") +} + +func TestParseVolumeID_EmptyVolumeID(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("") + + require.Error(t, err) + assert.Empty(t, fsID) + assert.Empty(t, fileSetID) + assert.Contains(t, err.Error(), "invalid volume ID format") +} + +func TestParseVolumeID_WithSpecialCharacters(t *testing.T) { + fsID, fileSetID, err := parseVolumeID("fs-abc-123+fset-def-456") + + require.NoError(t, err) + assert.Equal(t, "fs-abc-123", fsID) + assert.Equal(t, "fset-def-456", fileSetID) +}