Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ issues:
- path: .*\.go
linters:
- unused
# Temporary: suppress deprecated Endpoints API warning until dadi migrates to EndpointSlice.
- path: pkg/dadi/.*\.go
text: corev1\.Endpoints is deprecated
linters:
- staticcheck

# Mode of the generated files analysis.
#
Expand Down
2 changes: 2 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ teh = "teh"
ded = "ded"
eles = "eles"
TAGED = "TAGED"
Entrie = "Entrie"
DescribeFilesetsResponseBodyEntriesEntrie = "DescribeFilesetsResponseBodyEntriesEntrie"
23 changes: 23 additions & 0 deletions deploy/charts/alibaba-cloud-csi-driver/templates/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,29 @@ spec:
mountPath: /csi
{{- end -}}
{{- if and .Values.csi.bmcpfs.enabled .Values.csi.bmcpfs.controller.enabled }}
- name: external-bmcpfs-provisioner
image: {{ include "imageSpec" (list .Values "externalProvisioner") }}
args:
- --csi-address=/csi/csi.sock
- --http-endpoint=:8100
- --volume-name-prefix=bmcpfs
- --default-fstype=bmcpfs
- --timeout=150s
- --leader-election=true
- --retry-interval-start=500ms
- --kube-api-qps=100
- --kube-api-burst=200
- --v=5
resources:
requests:
cpu: 10m
memory: 16Mi
limits:
cpu: 500m
memory: 1024Mi
volumeMounts:
- name: bmcpfs-provisioner-dir
mountPath: /csi
- name: external-bmcpfs-attacher
image: {{ include "imageSpec" (list .Values "externalAttacher") }}
resources:
Expand Down
7 changes: 4 additions & 3 deletions pkg/bmcpfs/bmcpfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ const (
// keys in volume context or publish context
_vpcMountTarget = "vpcMountTarget"
_vscMountTarget = "vscMountTarget"
_vscId = "vscId"
_vscID = "vscId"
_networkType = "networkType"
_path = "path"
_mpAutoSwitch = "mountpointAutoSwitch"

// prefix of node id
CommonNodeIDPrefix = "common:"
// CommonNodeIDPrefix is the prefix for common node IDs
CommonNodeIDPrefix = "common:"
// LingjunNodeIDPrefix is the prefix for lingjun node IDs
LingjunNodeIDPrefix = "lingjun:"

// network types of CPFS mount targets
Expand Down
101 changes: 85 additions & 16 deletions pkg/bmcpfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

Expand All @@ -43,6 +44,7 @@ type controllerServer struct {
common.GenericControllerServer
vscManager *internal.PrimaryVscManagerWithCache
attachDetacher internal.CPFSAttachDetacher
filsetManager internal.CPFSFileSetManager
nasClient *nasclient.Client
skipDetach bool
}
Expand All @@ -62,18 +64,85 @@ func newControllerServer(region string) (*controllerServer, error) {
if err != nil {
return nil, err
}

return &controllerServer{
vscManager: internal.NewPrimaryVscManagerWithCache(efloClient),
attachDetacher: internal.NewCPFSAttachDetacher(nasClient),
filsetManager: internal.NewCPFSFileSetManager(nasClient),
nasClient: nasClient,
skipDetach: skipDetach,
}, nil
}

// CreateVolume ...
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
logger := klog.FromContext(ctx)
logger.V(2).Info("starting")

// Validate parameters
if err := validateFileSetParameters(req); err != nil {
klog.Errorf("CreateVolume: error parameters from input: %v, with error: %v", req.Name, err)
return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err)
}

// Extract parameters
params := req.GetParameters()
bmcpfsID := params["bmcpfsId"]
fullPath := req.Name
if rootPath, ok := params["path"]; ok && rootPath != "" {
fullPath = filepath.Join(rootPath, req.Name)
}
volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())

// Create fileset
fileSetID, err := cs.filsetManager.CreateFileSet(ctx, bmcpfsID, req.Name, fullPath, 1000000, volSizeBytes, false)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Prepare volume context
volumeContext := req.GetParameters()
if volumeContext == nil {
volumeContext = make(map[string]string)
}
volumeContext = updateVolumeContext(volumeContext)

klog.Infof("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]", req.GetName(), fileSetID, bmcpfsID, fullPath)

tmpVol := createVolumeResponse(fileSetID, bmcpfsID, volSizeBytes, volumeContext)

return &csi.CreateVolumeResponse{Volume: tmpVol}, nil
}

func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
logger := klog.FromContext(ctx)
logger.V(2).Info("starting")

// Parse volume ID to extract filesystem ID and fileset ID
fsID, fileSetID, err := parseVolumeID(req.VolumeId)
if err != nil {
klog.Errorf("DeleteVolume: failed to parse volume ID %s: %v", req.VolumeId, err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

klog.Infof("DeleteVolume: deleting fileset %s from filesystem %s", fileSetID, fsID)

// Delete the fileset
err = cs.filsetManager.DeleteFileSet(ctx, fsID, fileSetID)
if err != nil {
klog.Errorf("DeleteVolume: failed to delete fileset %s from filesystem %s: %v", fileSetID, fsID, err)
return nil, status.Error(codes.Internal, err.Error())
}

klog.Infof("DeleteVolume: successfully deleted fileset %s from filesystem %s", fileSetID, fsID)
return &csi.DeleteVolumeResponse{}, nil
}

func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: common.ControllerRPCCapabilities(
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
),
}, nil
}
Expand All @@ -94,7 +163,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
}

cpfsID, _ := parseVolumeHandle(req.VolumeId)
// Get VscMountTarget of filesystem

mt := req.VolumeContext[_vscMountTarget]
if mt == "" {
var err error
Expand All @@ -105,18 +174,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
}

// Get Primary vsc of Lingjun node
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
if LingjunNodeIDPrefix == "" {
return nil, status.Error(codes.InvalidArgument, "invalid node id")
}
vscId, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceId, false)
vscID, err := cs.vscManager.EnsurePrimaryVsc(ctx, lingjunInstanceID, false)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscId)
klog.Info("Use VSC MountTarget for lingjun node", "nodeId", req.NodeId, "vscId", vscID)

// Attach CPFS to VSC
err = cs.attachDetacher.Attach(ctx, cpfsID, vscId)
err = cs.attachDetacher.Attach(ctx, cpfsID, vscID)
if err != nil {
if autoSwitch, _ := strconv.ParseBool(req.VolumeContext[_mpAutoSwitch]); autoSwitch && internal.IsAttachNotSupportedError(err) {
if req.VolumeContext[_vpcMountTarget] == "" {
Expand All @@ -135,35 +204,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs

// TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node

klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscId, "node", req.NodeId)
klog.InfoS("ControllerPublishVolume: attached cpfs to vsc", "vscMountTarget", mt, "vscId", vscID, "node", req.NodeId)
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
_networkType: networkTypeVSC,
_vscId: vscId,
_vscID: vscID,
_vscMountTarget: mt,
},
}, nil
}

func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (
*csi.ControllerUnpublishVolumeResponse, error,
) {
func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if !strings.HasPrefix(req.NodeId, LingjunNodeIDPrefix) || cs.skipDetach {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
// Create Primary vsc for Lingjun node
lingjunInstanceId := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
lingjunInstanceID := strings.TrimPrefix(req.NodeId, LingjunNodeIDPrefix)
if LingjunNodeIDPrefix == "" {
return nil, status.Error(codes.InvalidArgument, "invalid node id")
}
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceId)
vsc, err := cs.vscManager.GetPrimaryVscOf(lingjunInstanceID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal, "get vsc error: %v", err)
}
if vsc == nil {
klog.InfoS("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found", "node", req.NodeId)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error.
err = cs.attachDetacher.Detach(ctx, req.VolumeId, vsc.VscID)
if err != nil {
Expand All @@ -173,6 +241,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// KubernetesAlicloudIdentity is the user agent string for Eflo client
var KubernetesAlicloudIdentity = fmt.Sprintf("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s", version.VERSION)

const efloConnTimeout = 10
Expand Down Expand Up @@ -227,9 +296,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) {
return parts[0], ""
}

func getMountTarget(client *nasclient.Client, fsId, networkType string) (string, error) {
func getMountTarget(client *nasclient.Client, fsID, networkType string) (string, error) {
resp, err := client.DescribeFileSystems(&nasclient.DescribeFileSystemsRequest{
FileSystemId: &fsId,
FileSystemId: &fsID,
})
if err != nil {
return "", fmt.Errorf("nas:DescribeFileSystems failed: %w", err)
Expand All @@ -252,7 +321,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
if t == networkType {
mountTarget := tea.StringValue(mt.MountTargetDomain)
status := tea.StringValue(mt.Status)
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsId, "networkType", networkType, "mountTarget", mountTarget, "status", status)
klog.V(2).InfoS("Found cpfs mount target", "filesystem", fsID, "networkType", networkType, "mountTarget", mountTarget, "status", status)
if status == "Active" {
return mountTarget, nil
}
Expand Down
Loading