From 6c21dcf9a616158091c5607ece4c16be98b38523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Wed, 23 Jul 2025 13:24:04 +0800 Subject: [PATCH 1/6] disk: optimize batch create/delete Integrating batcher and throttle. --- pkg/cloud/throttle/throttle.go | 11 +++++++ pkg/disk/cloud.go | 33 ++++++++++---------- pkg/disk/cloud_test.go | 57 ++++++++++++++++++++++------------ pkg/disk/controllerserver.go | 13 ++++++-- pkg/disk/desc/describe.go | 19 +++++++----- pkg/disk/disk.go | 2 +- 6 files changed, 87 insertions(+), 48 deletions(-) diff --git a/pkg/cloud/throttle/throttle.go b/pkg/cloud/throttle/throttle.go index 6af0acc12..da909e46e 100644 --- a/pkg/cloud/throttle/throttle.go +++ b/pkg/cloud/throttle/throttle.go @@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error { } } } + +func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) { + return func(ctx context.Context, req TReq) (TResp, error) { + var resp TResp + err := t.Throttle(ctx, func() (err error) { + resp, err = f(req) + return err + }) + return resp, err + } +} diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 0e90ea97d..37e417d87 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -77,6 +77,13 @@ type DiskAttachDetach struct { dev *DeviceManager } +type DiskCreateDelete struct { + ecs cloud.ECSInterface + batcher batcher.Batcher[ecs.Disk] + createThrottler *throttle.Throttler + deleteThrottler *throttle.Throttler +} + func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) { after, err := ad.dev.ListBlocks() if err != nil { @@ -290,11 +297,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin for key, value := range GlobalConfigVar.RequestBaseInfo { attachRequest.AppendUserAgent(key, value) } - var response *ecs.AttachDiskResponse - err = ad.attachThrottler.Throttle(ctx, func() error { - response, err = ecsClient.AttachDisk(attachRequest) - return err - }) + response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest) if err != nil { var aliErr *alicloudErr.ServerError if errors.As(err, &aliErr) { @@ -465,11 +468,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI for key, value := range GlobalConfigVar.RequestBaseInfo { detachDiskRequest.AppendUserAgent(key, value) } - var response *ecs.DetachDiskResponse - err = ad.detachThrottler.Throttle(ctx, func() error { - response, err = ecsClient.DetachDisk(detachDiskRequest) - return err - }) + response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest) if err != nil { return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err) } @@ -927,7 +926,7 @@ func isValidSnapshotName(name string) bool { return validDiskNameRegexp.MatchString(name) } -func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) { +func (c *DiskCreateDelete) createDisk(ctx context.Context, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) { // 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来 createDiskRequest := buildCreateDiskRequest(diskVol) if isValidDiskName(diskName) { @@ -962,7 +961,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV attempt.Instance = selectedInstance } retry: - diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient) + diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt) if err != nil { if final { return "", attempt, err @@ -972,7 +971,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV // protect us from infinite loop return "", attempt, status.Error(codes.Internal, "unexpected parameter mismatch") } - existingDisk, err := findDiskByName(diskName, ecsClient) + existingDisk, err := findDiskByName(diskName, c.ecs) if err != nil { return "", attempt, status.Errorf(codes.Internal, "parameter mismatch detected, but fetch existing disk failed: %v", err) } @@ -1051,11 +1050,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt var ErrParameterMismatch = errors.New("parameter mismatch") -func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) { +func (c *DiskCreateDelete) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt) (diskId string, final bool, err error) { req = finalizeCreateDiskRequest(req, attempt) klog.Infof("request: request content: %++v", req) - volumeRes, err := ecsClient.CreateDisk(req) + volumeRes, err := throttle.Throttled(c.createThrottler, c.ecs.CreateDisk)(ctx, req) if err == nil { klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId) return volumeRes.DiskId, true, nil @@ -1157,14 +1156,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag { return diskTags } -func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) { +func (c *DiskCreateDelete) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) { deleteDiskRequest := ecs.CreateDeleteDiskRequest() deleteDiskRequest.DiskId = diskId var resp *ecs.DeleteDiskResponse err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) { var err error - resp, err = ecsClient.DeleteDisk(deleteDiskRequest) + resp, err = throttle.Throttled(c.deleteThrottler, c.ecs.DeleteDisk)(ctx, deleteDiskRequest) if err == nil { klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId) return true, nil diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index c522ff540..b2de76091 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -4,14 +4,19 @@ import ( "context" "fmt" "testing" + "time" alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" gomock "github.com/golang/mock/gomock" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/clock" ) var ( @@ -31,37 +36,49 @@ func init() { }`), resizeDiskResponse) } -func TestDeleteDisk(t *testing.T) { +func testCreateDelete(t *testing.T) (*cloud.MockECSInterface, *DiskCreateDelete) { ctrl := gomock.NewController(t) c := cloud.NewMockECSInterface(ctrl) + return c, &DiskCreateDelete{ + ecs: c, + createThrottler: defaultThrottler(), + deleteThrottler: defaultThrottler(), + batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8), + } +} + +func TestDeleteDisk(t *testing.T) { + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) + c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Nil(t, err) } func TestDeleteDiskRetryOnInitError(t *testing.T) { t.Parallel() - ctrl := gomock.NewController(t) - c := cloud.NewMockECSInterface(ctrl) + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "") c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr) c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Nil(t, err) } func TestDeleteDiskPassthroughError(t *testing.T) { - ctrl := gomock.NewController(t) - c := cloud.NewMockECSInterface(ctrl) + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "") c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Equal(t, serverErr, err) } @@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) if !c.err { client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ @@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) { client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")) } - diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode) if c.err { assert.Error(t, err) } else { @@ -409,8 +426,8 @@ func TestCreateDisk_Basic(t *testing.T) { } func TestCreateDisk_ServerFailFallback(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, "")) client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ @@ -418,7 +435,7 @@ func TestCreateDisk_ServerFailFallback(t *testing.T) { }, nil) args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} - diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) assert.NoError(t, err) assert.Equal(t, "d-123", diskID) assert.Equal(t, DiskESSDAuto, attempt.Category) @@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")) r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ @@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} - diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) if c.err { assert.Error(t, err) } else { @@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } func TestCreateDisk_NoInfiniteLoop(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2) client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ @@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) { }, nil) args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20} - _, _, err := createDisk(client, "disk-name", "", args, nil, "", false) + _, _, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) assert.Error(t, err) } diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index 5b18b7c5c..6f5edd204 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -52,6 +52,7 @@ import ( type controllerServer struct { recorder record.EventRecorder ad DiskAttachDetach + cd DiskCreateDelete meta metadata.MetadataProvider ecs cloud.ECSInterface snapshotWaiter waitstatus.StatusWaiter[ecs.Snapshot] @@ -104,6 +105,12 @@ func NewControllerServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata attachThrottler: defaultThrottler(), detachThrottler: defaultThrottler(), }, + cd: DiskCreateDelete{ + ecs: ecs, + batcher: batcher, + createThrottler: defaultThrottler(), + deleteThrottler: defaultThrottler(), + }, snapshotWaiter: newSnapshotStatusWaiter(), } detachConcurrency := 1 @@ -177,7 +184,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol isVirtualNode = node.Labels[common.NodeTypeLabelKey] == common.VirtualNodeType } - diskID, attempt, err := createDisk(cs.ecs, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode) + diskID, attempt, err := cs.cd.createDisk(ctx, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode) if err != nil { if errors.Is(err, ErrParameterMismatch) { return nil, status.Errorf(codes.AlreadyExists, "volume %s already created but %v", req.Name, err) @@ -220,7 +227,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol var disk *ecs.Disk describeDisk := func() (*csi.DeleteVolumeResponse, error) { var err error - disk, err = findDiskByID(req.VolumeId, cs.ecs) + disk, err = cs.cd.batcher.Describe(ctx, req.VolumeId) if err != nil { return nil, status.Errorf(codes.Internal, "DeleteVolume: find disk(%s) by id with error: %v", req.VolumeId, err) } @@ -274,7 +281,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } } - _, err := deleteDisk(ctx, cs.ecs, req.VolumeId) + _, err := cs.cd.deleteDisk(ctx, cs.ecs, req.VolumeId) if err != nil { newErrMsg := utils.FindSuggestionByErrorMessage(err.Error(), utils.DiskDelete) return nil, status.Errorf(codes.Internal, "DeleteVolume: Delete disk with error: %s", newErrMsg) diff --git a/pkg/disk/desc/describe.go b/pkg/disk/desc/describe.go index 488868db3..aab713f1b 100644 --- a/pkg/disk/desc/describe.go +++ b/pkg/disk/desc/describe.go @@ -5,6 +5,7 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" ) const batchSize = 100 @@ -29,17 +30,21 @@ func encodeIDs(ids []string) string { return string(res) } -type Disk struct { - *ecs.Client +type disk struct { + cloud.ECSInterface +} + +func Disk(client cloud.ECSInterface) Client[ecs.Disk] { + return disk{client} } -func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) { +func (c disk) Describe(ids []string) (Response[ecs.Disk], error) { req := ecs.CreateDescribeDisksRequest() req.DiskIds = encodeIDs(ids) req.PageSize = requests.NewInteger(batchSize) ret := Response[ecs.Disk]{} - resp, err := c.Client.DescribeDisks(req) + resp, err := c.DescribeDisks(req) if err != nil { return ret, err } @@ -48,15 +53,15 @@ func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) { return ret, nil } -func (c Disk) GetID(resource *ecs.Disk) string { +func (c disk) GetID(resource *ecs.Disk) string { return resource.DiskId } -func (c Disk) Type() string { +func (c disk) Type() string { return "disk" } -func (c Disk) BatchSize() int { +func (c disk) BatchSize() int { return batchSize } diff --git a/pkg/disk/disk.go b/pkg/disk/disk.go index c0d82351e..f4eba8557 100644 --- a/pkg/disk/disk.go +++ b/pkg/disk/disk.go @@ -243,7 +243,7 @@ func GlobalConfigSet(m metadata.MetadataProvider) utils.Config { } func newBatcher(fromNode bool) (waitstatus.StatusWaiter[ecs.Disk], batcher.Batcher[ecs.Disk]) { - client := desc.Disk{Client: GlobalConfigVar.EcsClient} + client := desc.Disk(GlobalConfigVar.EcsClient) ctx := context.Background() interval := 1 * time.Second if fromNode { From 3a64b2d10083ff048beabbdeef817f54bea3307a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Thu, 24 Jul 2025 15:47:51 +0800 Subject: [PATCH 2/6] batcher: optimize log for inefficient batching Sometimes we are actually slowing down the system with batching. Optimize logs to diagnose this. --- pkg/disk/batcher/low_latency.go | 50 ++++++++++++++++++---------- pkg/disk/batcher/low_latency_test.go | 29 +++++++++++++--- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/pkg/disk/batcher/low_latency.go b/pkg/disk/batcher/low_latency.go index 698a3a3c3..e3c9435e0 100644 --- a/pkg/disk/batcher/low_latency.go +++ b/pkg/disk/batcher/low_latency.go @@ -47,7 +47,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq ecsClient: ecsClient, requestChan: make(chan *getRequest[*T]), tokens: tokenBucket{ - limit: burst, perToken: perRequest, + limit: time.Duration(burst) * perRequest, perToken: perRequest, }, clk: clk, } @@ -71,28 +71,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) { type tokenBucket struct { zeroAt time.Time - limit int - perToken time.Duration + limit time.Duration // How long it takes to fully refill the bucket + perToken time.Duration // How long it takes to refill one token } func (tb *tokenBucket) tokenAt(t time.Time) float64 { - elapsed := t.Sub(tb.zeroAt) - token := float64(elapsed) / float64(tb.perToken) - if token > float64(tb.limit) { - token = float64(tb.limit) - } - return token + elapsed := min(t.Sub(tb.zeroAt), tb.limit) + return float64(elapsed) / float64(tb.perToken) } func (tb *tokenBucket) takeAt(t time.Time) { elapsed := t.Sub(tb.zeroAt) - if elapsed >= time.Duration(tb.limit)*tb.perToken { - tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken) - } else { - tb.zeroAt = tb.zeroAt.Add(tb.perToken) - if tb.zeroAt.After(t) { - tb.zeroAt = t - } + if elapsed >= tb.limit { + tb.zeroAt = t.Add(-tb.limit) + } + tb.zeroAt = tb.zeroAt.Add(tb.perToken) + if tb.zeroAt.After(t) { + tb.zeroAt = t } } @@ -109,6 +104,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) { timeout = nil } + var d time.Duration + nInefficient := 0 for { select { case <-ctx.Done(): @@ -121,17 +118,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) { requests[r.id] = append(requests[r.id], r) if len(requests) == batchSize { logger.V(4).Info("batch full", "n", batchSize) + nInefficient = 0 descBatch(t) } else if timeout == nil { // add some artificial delay for better batching // the less tokens left, the more we wait tokens := w.tokens.tokenAt(t) - d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken)) + d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken)) timeout = w.clk.After(d) logger.V(4).Info("batch waiting", "timeout", d) } case t := <-timeout: - logger.V(4).Info("batch timeout", "n", len(requests)) + v := 4 + if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput + v = 2 + if len(requests) <= 1 { + // We have waited, but didn't get the second request. + // We increased the latency with no benefit :( + nInefficient++ + v = 1 + if nInefficient == 3 { + logger.V(1).Info("Inefficient batching, please increase upstream concurrency") + } + } + } + if v > 1 { + nInefficient = 0 + } + logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests)) descBatch(t) } } diff --git a/pkg/disk/batcher/low_latency_test.go b/pkg/disk/batcher/low_latency_test.go index 607687569..b57810820 100644 --- a/pkg/disk/batcher/low_latency_test.go +++ b/pkg/disk/batcher/low_latency_test.go @@ -12,20 +12,22 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing" "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testclock "k8s.io/utils/clock/testing" ) func TestZeroBucketIsFilled(t *testing.T) { bucket := tokenBucket{ - limit: 8, perToken: 2 * time.Second, + limit: 16 * time.Second, perToken: 2 * time.Second, } assert.Equal(t, 8., bucket.tokenAt(time.Now())) } func TestBucketTake(t *testing.T) { bucket := tokenBucket{ - limit: 8, perToken: 2 * time.Second, + limit: 16 * time.Second, perToken: 2 * time.Second, } now := time.Now() bucket.takeAt(now) @@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock, clk = testclock.NewFakeClock(time.Now()) batcher = NewLowLatency(client, clk, 2*time.Second, 8) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + logger := ktesting.NewLogger(t, ktesting.DefaultConfig) + ctx := klog.NewContext(t.Context(), logger) go batcher.Run(ctx) return client, clk, batcher @@ -153,3 +155,22 @@ func TestFailures(t *testing.T) { assert.ErrorIs(t, err, ErrFake) assert.Nil(t, disk) } + +func TestReportInefficient(t *testing.T) { + client := &testdesc.FakeClient{} + batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8) + + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + ctx := klog.NewContext(t.Context(), logger) + go batcher.Run(ctx) + + client.Disks.Store("d-test", &ecs.Disk{ + DiskId: "d-test", + DiskName: "mydisk", + }) + + for range 20 { + batcher.Describe(ctx, "d-test") + } + assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching") +} From 6ac86a9ada7fa1be1a7f475e0f146a30125b433e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 28 Sep 2025 18:12:45 +0800 Subject: [PATCH 3/6] Avoid global ECS client for easier test --- pkg/disk/cloud.go | 13 ++++++------- pkg/disk/controllerserver.go | 1 + pkg/disk/disk.go | 2 +- pkg/disk/nodeserver.go | 4 +++- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 37e417d87..6ceffdb02 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -67,6 +67,7 @@ const ( type DiskAttachDetach struct { slots AttachDetachSlots + ecs cloud.ECSInterface waiter waitstatus.StatusWaiter[ecs.Disk] batcher batcher.Batcher[ecs.Disk] @@ -167,7 +168,6 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin logger := klog.FromContext(ctx) logger.V(2).Info("Starting Do AttachDisk", "instanceID", nodeID, "region", GlobalConfigVar.Region) - ecsClient := GlobalConfigVar.EcsClient // Step 1: check disk status disk, err := ad.findDiskByID(ctx, diskID) if err != nil { @@ -192,7 +192,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin // tag disk as k8s.aliyun.com=true if GlobalConfigVar.DiskTagEnable { - tagDiskAsK8sAttached(diskID, ecsClient) + tagDiskAsK8sAttached(diskID, GlobalConfigVar.EcsClient) } cate, ok := AllCategories[Category(disk.Category)] @@ -237,7 +237,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin } if GlobalConfigVar.DiskBdfEnable { - if allowed, err := forceDetachAllowed(ecsClient, disk); err != nil { + if allowed, err := forceDetachAllowed(GlobalConfigVar.EcsClient, disk); err != nil { return "", status.Errorf(codes.Aborted, "forceDetachAllowed failed: %v", err) } else if !allowed { return "", status.Errorf(codes.Aborted, "AttachDisk: Disk %s is already attached to instance %s, and depend bdf, reject force detach", disk.DiskId, disk.InstanceId) @@ -257,7 +257,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin for key, value := range GlobalConfigVar.RequestBaseInfo { detachRequest.AppendUserAgent(key, value) } - _, err = ecsClient.DetachDisk(detachRequest) + _, err = ad.ecs.DetachDisk(detachRequest) if err != nil { klog.Errorf("AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) return "", status.Errorf(codes.Aborted, "AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) @@ -297,7 +297,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin for key, value := range GlobalConfigVar.RequestBaseInfo { attachRequest.AppendUserAgent(key, value) } - response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest) + response, err := throttle.Throttled(ad.attachThrottler, ad.ecs.AttachDisk)(ctx, attachRequest) if err != nil { var aliErr *alicloudErr.ServerError if errors.As(err, &aliErr) { @@ -338,7 +338,6 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) { klog.Infof("AttachDisk: Starting Do AttachMultiAttachDisk: DiskId: %s, InstanceId: %s, Region: %v", diskID, nodeID, GlobalConfigVar.Region) - ecsClient := GlobalConfigVar.EcsClient // Step 1: check disk status disk, err := ad.findDiskByID(ctx, diskID) if err != nil { @@ -360,7 +359,7 @@ func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, n attachRequest := ecs.CreateAttachDiskRequest() attachRequest.InstanceId = nodeID attachRequest.DiskId = diskID - response, err := ecsClient.AttachDisk(attachRequest) + response, err := ad.ecs.AttachDisk(attachRequest) if err != nil { if strings.Contains(err.Error(), DiskLimitExceeded) { return "", status.Error(codes.Internal, err.Error()+", Node("+nodeID+")exceed the limit attachments of disk") diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index 6f5edd204..59daf3cf4 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -99,6 +99,7 @@ func NewControllerServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata meta: m, ecs: ecs, ad: DiskAttachDetach{ + ecs: ecs, waiter: waiter, batcher: batcher, diff --git a/pkg/disk/disk.go b/pkg/disk/disk.go index f4eba8557..266e34c6c 100644 --- a/pkg/disk/disk.go +++ b/pkg/disk/disk.go @@ -133,7 +133,7 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.S servers.ControllerServer = NewControllerServer(csiCfg, client, m) } if serviceType&utils.Node != 0 { - servers.NodeServer = NewNodeServer(m) + servers.NodeServer = NewNodeServer(client, m) } if features.FunctionalMutableFeatureGate.Enabled(features.EnableVolumeGroupSnapshots) { servers.GroupControllerServer = NewGroupControllerServer() diff --git a/pkg/disk/nodeserver.go b/pkg/disk/nodeserver.go index 24e482c1a..988c31947 100644 --- a/pkg/disk/nodeserver.go +++ b/pkg/disk/nodeserver.go @@ -30,6 +30,7 @@ import ( "syscall" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/sfdisk" @@ -171,7 +172,7 @@ func parseVolumeCountEnv() (int, error) { } // NewNodeServer creates node server -func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer { +func NewNodeServer(ecs cloud.ECSInterface, m metadata.MetadataProvider) csi.NodeServer { // Create Directory err := os.MkdirAll(VolumeDir, os.FileMode(0755)) if err != nil { @@ -224,6 +225,7 @@ func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer { podCGroup: podCgroup, clientSet: GlobalConfigVar.ClientSet, ad: DiskAttachDetach{ + ecs: ecs, waiter: waiter, batcher: batcher, // if ADController is not enabled, we need serial attach to recognize old disk From de34e14d7f661076606e59e00199a4ea3d2f6890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 28 Sep 2025 20:55:33 +0800 Subject: [PATCH 4/6] add unit test for attach disk --- pkg/disk/batcher/passthrough.go | 25 ++++++ pkg/disk/cloud_test.go | 148 ++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 pkg/disk/batcher/passthrough.go diff --git a/pkg/disk/batcher/passthrough.go b/pkg/disk/batcher/passthrough.go new file mode 100644 index 000000000..f527eb24a --- /dev/null +++ b/pkg/disk/batcher/passthrough.go @@ -0,0 +1,25 @@ +package batcher + +import ( + "context" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" +) + +type Passthrough[T any] struct { + client desc.Client[T] +} + +func NewPassthrough[T any](client desc.Client[T]) Passthrough[T] { + return Passthrough[T]{ + client: client, + } +} + +func (w Passthrough[T]) Describe(ctx context.Context, id string) (*T, error) { + resp, err := w.client.Describe([]string{id}) + if err != nil { + return nil, err + } + return &resp.Resources[0], nil +} diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index b2de76091..2d53ecbf1 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -13,7 +13,9 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" @@ -598,3 +600,149 @@ func Test_getDiskDescribeRequest(t *testing.T) { }) } } + +func testAttachDetach(t *testing.T) (context.Context, *cloud.MockECSInterface, *DiskAttachDetach) { + ctrl := gomock.NewController(t) + ecs := cloud.NewMockECSInterface(ctrl) + _, ctx := ktesting.NewTestContext(t) + + client := desc.Disk(ecs) + b := batcher.NewPassthrough(client) + return ctx, ecs, &DiskAttachDetach{ + slots: NewSlots(0, 0), + ecs: ecs, + waiter: waitstatus.NewSimple(client, clock.RealClock{}), + batcher: b, + attachThrottler: defaultThrottler(), + detachThrottler: defaultThrottler(), + dev: DefaultDeviceManager, + } +} + +func disk(status string, node string) ecs.Disk { + disk := ecs.Disk{ + Status: status, + Category: "cloud_regional_disk_auto", // only one support forceAttach + MultiAttach: "Disabled", + DiskId: "d-testdiskid", + InstanceId: node, + SerialNumber: "fake-serial-number", + } + if node != "" { + disk.Attachments.Attachment = []ecs.Attachment{ + {InstanceId: node}, + } + } + return disk +} + +func diskResp(disk ecs.Disk) *ecs.DescribeDisksResponse { + return &ecs.DescribeDisksResponse{ + Disks: ecs.DisksInDescribeDisks{ + Disk: []ecs.Disk{disk}, + }, + } +} + +func TestAttachDisk(t *testing.T) { + GlobalConfigVar.DetachBeforeAttach = true // This is the default + cases := []struct { + name string + before, after ecs.Disk + detaching bool + detach bool + detached ecs.Disk + noAttach bool + forceAttach bool + expectErr bool + }{ + { + name: "already attached", + before: disk("In_use", "i-testinstanceid"), + noAttach: true, + }, + { + name: "attached to other", + before: disk("In_use", "i-anotherinstance"), + detaching: true, + forceAttach: true, + after: disk("In_use", "i-testinstanceid"), + }, + { + name: "attached to other (no force attach)", + before: disk("In_use", "i-anotherinstance"), + detach: true, + detached: disk("Available", ""), + after: disk("In_use", "i-testinstanceid"), + }, + { + name: "normal", + before: disk("Available", ""), + after: disk("In_use", "i-testinstanceid"), + }, + { + name: "attaching", + before: disk("Attaching", ""), + noAttach: true, + expectErr: true, + }, + { + name: "detaching from self", + before: disk("Detaching", "i-testinstanceid"), + after: disk("In_use", "i-testinstanceid"), // FIXME + }, + { + // This not supported by ECS, but desired by us to speed up failover. Hopes they will support it someday. + name: "detaching from other", + before: disk("Detaching", "i-anotherinstance"), + detaching: true, + forceAttach: true, + after: disk("In_use", "i-testinstanceid"), + }, + { + // This is likely to fail in real env. But we try it anyway, in case the detach just finished after we checked + name: "detaching from other (no force attach)", + before: disk("Detaching", "i-anotherinstance"), + forceAttach: false, + after: disk("In_use", "i-testinstanceid"), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx, c, ad := testAttachDetach(t) + + if tc.detaching { + ad.detaching.Store("d-testdiskid", "i-anotherinstance") + } + + c.EXPECT().DescribeDisks(gomock.Any()).Return(diskResp(tc.before), nil) + if tc.detach { + detachCall := c.EXPECT().DetachDisk(gomock.Any()).Return(&ecs.DetachDiskResponse{}, nil) + c.EXPECT().DescribeDisks(gomock.Any()).Return(diskResp(tc.detached), nil).After(detachCall) + } + force := false + if !tc.noAttach { + attachCall := c.EXPECT().AttachDisk(gomock.Any()).Return(&ecs.AttachDiskResponse{}, nil). + Do(func(request *ecs.AttachDiskRequest) { + if request.Force.HasValue() { + var err error + force, err = request.Force.GetValue() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + }) + c.EXPECT().DescribeDisks(gomock.Any()).Return(diskResp(tc.after), nil).After(attachCall) + } + serial, err := ad.attachDisk(ctx, "d-testdiskid", "i-testinstanceid", false) + + assert.Equal(t, tc.forceAttach, force) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, "fake-serial-number", serial) + } + }) + } +} From 26c417c2bb17ffac2122e8c38a950095be5f6411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 28 Sep 2025 17:21:10 +0800 Subject: [PATCH 5/6] disk: fix Detaching disks treated as attached We should reject to attach disk detaching from the requested node. --- pkg/disk/cloud.go | 150 ++++++++++++++++++++++++++--------------- pkg/disk/cloud_test.go | 7 +- 2 files changed, 101 insertions(+), 56 deletions(-) diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 6ceffdb02..403cca596 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -162,6 +162,51 @@ func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial strin return device, nil } +type attachAction int + +const ( + giveUp attachAction = iota + alreadyAttached + detachFirst + forceAttach + attachNormally +) + +func chooseAttachAction(disk *ecs.Disk, instanceID string) (attachAction, error) { + if disk.MultiAttach == "Disabled" { + switch disk.Status { + case DiskStatusInuse: + if disk.InstanceId == instanceID { + return alreadyAttached, nil + } + return detachFirst, nil + case DiskStatusAttaching: + return giveUp, status.Errorf(codes.Aborted, "AttachDisk: Disk %s is attaching to %s", disk.DiskId, instanceID) + case DiskStatusDetaching: + if disk.InstanceId == instanceID { + return giveUp, status.Errorf(codes.Aborted, "AttachDisk: Disk %s is detaching from %s", disk.DiskId, instanceID) + } + // disk is detaching from another instance + return forceAttach, nil + } + } else { + switch disk.Status { + case DiskStatusInuse: + if waitstatus.IsInstanceAttached(disk, instanceID) { + return alreadyAttached, nil + } + case DiskStatusDetaching: + a := disk.Attachments.Attachment + if len(a) == 1 && a[0].InstanceId == instanceID { + return giveUp, status.Errorf(codes.Aborted, "AttachDisk: Disk %s is detaching from %s", disk.DiskId, instanceID) + } + } + // Not sure for status == "Attaching", maybe attaching to another node, but detaching from the requested node? + } + // Most likely status == "Available". But let AttachDisk OpenAPI to decide. + return attachNormally, nil +} + // Attach Alibaba Cloud disk. // Returns device path if fromNode, disk serial number otherwise. func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) { @@ -207,35 +252,11 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin } } - tryForceAttach := false - - // disk is attached, means disk_ad_controller env is true, disk must be created after 2020.06 - switch disk.Status { - case DiskStatusInuse: - if disk.InstanceId == nodeID { - if !fromNode { - klog.Infof("AttachDisk: Disk %s is already attached to Instance %s, skipping", diskID, disk.InstanceId) - return disk.SerialNumber, nil - } - deviceName, err := GetVolumeDeviceName(diskID) - if err == nil && deviceName != "" && IsFileExisting(deviceName) { - klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) - return deviceName, nil - } else if disk.SerialNumber != "" { - // wait for pci attach ready - time.Sleep(5 * time.Second) - klog.Infof("AttachDisk: find disk dev after 5 seconds") - deviceName, err := GetVolumeDeviceName(diskID) - if err == nil && deviceName != "" && IsFileExisting(deviceName) { - klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) - return deviceName, nil - } - err = fmt.Errorf("AttachDisk: disk device cannot be found in node, diskid: %s, deviceName: %s, err: %+v", diskID, deviceName, err) - return "", err - } - klog.Warningf("AttachDisk: Disk (no serial) %s is already attached to instance %s, but device unknown, will be detached and try again", diskID, disk.InstanceId) - } - + action, err := chooseAttachAction(disk, nodeID) + if err != nil { + return "", err + } + if action == detachFirst || action == forceAttach { if GlobalConfigVar.DiskBdfEnable { if allowed, err := forceDetachAllowed(GlobalConfigVar.EcsClient, disk); err != nil { return "", status.Errorf(codes.Aborted, "forceDetachAllowed failed: %v", err) @@ -247,31 +268,54 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin if !GlobalConfigVar.DetachBeforeAttach { return "", status.Errorf(codes.Aborted, "AttachDisk: Disk %s is already attached to instance %s, env DISK_FORCE_DETACHED is false reject force detach", diskID, disk.InstanceId) } - if canForceAttach { - tryForceAttach = true - } else { - klog.Infof("AttachDisk: Disk %s is already attached to instance %s, will be detached", diskID, disk.InstanceId) - detachRequest := ecs.CreateDetachDiskRequest() - detachRequest.InstanceId = disk.InstanceId - detachRequest.DiskId = disk.DiskId - for key, value := range GlobalConfigVar.RequestBaseInfo { - detachRequest.AppendUserAgent(key, value) - } - _, err = ad.ecs.DetachDisk(detachRequest) - if err != nil { - klog.Errorf("AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) - return "", status.Errorf(codes.Aborted, "AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) - } - klog.Infof("AttachDisk: Wait for disk %s to be detached", diskID) - if err := ad.waitForDiskDetached(ctx, diskID, nodeID); err != nil { - return "", err + } + if canForceAttach && action == detachFirst { + action = forceAttach + } + if action == forceAttach && !canForceAttach { + action = attachNormally // should fail, but try it anyway + } + + switch action { + case alreadyAttached: + if !fromNode { + klog.Infof("AttachDisk: Disk %s is already attached to Instance %s, skipping", diskID, disk.InstanceId) + return disk.SerialNumber, nil + } + deviceName, err := GetVolumeDeviceName(diskID) + if err == nil && deviceName != "" && IsFileExisting(deviceName) { + klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) + return deviceName, nil + } else if disk.SerialNumber != "" { + // wait for pci attach ready + time.Sleep(5 * time.Second) + klog.Infof("AttachDisk: find disk dev after 5 seconds") + deviceName, err := GetVolumeDeviceName(diskID) + if err == nil && deviceName != "" && IsFileExisting(deviceName) { + klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) + return deviceName, nil } + err = fmt.Errorf("AttachDisk: disk device cannot be found in node, diskid: %s, deviceName: %s, err: %+v", diskID, deviceName, err) + return "", err + } + klog.Warningf("AttachDisk: Disk (no serial) %s is already attached to instance %s, but device unknown, will be detached and try again", diskID, disk.InstanceId) + fallthrough + case detachFirst: + klog.Infof("AttachDisk: Disk %s is already attached to instance %s, will be detached", diskID, disk.InstanceId) + detachRequest := ecs.CreateDetachDiskRequest() + detachRequest.InstanceId = disk.InstanceId + detachRequest.DiskId = disk.DiskId + for key, value := range GlobalConfigVar.RequestBaseInfo { + detachRequest.AppendUserAgent(key, value) } - case DiskStatusAttaching: - return "", status.Errorf(codes.Aborted, "AttachDisk: Disk %s is attaching %v", diskID, disk) - case DiskStatusDetaching: - if canForceAttach { - tryForceAttach = true + _, err = ad.ecs.DetachDisk(detachRequest) + if err != nil { + klog.Errorf("AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) + return "", status.Errorf(codes.Aborted, "AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) + } + klog.Infof("AttachDisk: Wait for disk %s to be detached", diskID) + if err := ad.waitForDiskDetached(ctx, diskID, nodeID); err != nil { + return "", err } } @@ -287,7 +331,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin attachRequest := ecs.CreateAttachDiskRequest() attachRequest.InstanceId = nodeID attachRequest.DiskId = diskID - if tryForceAttach { + if action == forceAttach { attachRequest.Force = requests.NewBoolean(true) logger.V(1).Info("try force attach", "from", disk.InstanceId, "to", nodeID) } diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index 2d53ecbf1..90099080d 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -687,9 +687,10 @@ func TestAttachDisk(t *testing.T) { expectErr: true, }, { - name: "detaching from self", - before: disk("Detaching", "i-testinstanceid"), - after: disk("In_use", "i-testinstanceid"), // FIXME + name: "detaching from self", + before: disk("Detaching", "i-testinstanceid"), + noAttach: true, + expectErr: true, }, { // This not supported by ECS, but desired by us to speed up failover. Hopes they will support it someday. From 0f325d8cca3942f82cc871dbce838bfa1ed57b1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 28 Sep 2025 18:22:47 +0800 Subject: [PATCH 6/6] optimize log for contextual logging --- pkg/disk/cloud.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 403cca596..d9a9e7d80 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -211,13 +211,12 @@ func chooseAttachAction(disk *ecs.Disk, instanceID string) (attachAction, error) // Returns device path if fromNode, disk serial number otherwise. func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) { logger := klog.FromContext(ctx) - logger.V(2).Info("Starting Do AttachDisk", "instanceID", nodeID, "region", GlobalConfigVar.Region) + logger.V(2).Info("Starting Do AttachDisk") // Step 1: check disk status disk, err := ad.findDiskByID(ctx, diskID) if err != nil { - klog.Errorf("AttachDisk: find disk: %s with error: %s", diskID, err.Error()) - return "", status.Errorf(codes.Internal, "AttachDisk: find disk: %s with error: %s", diskID, err.Error()) + return "", status.Errorf(codes.Internal, "AttachDisk: find disk: %s with error: %v", diskID, err) } if disk == nil { return "", status.Errorf(codes.NotFound, "AttachDisk: csi can't find disk: %s in region: %s, Please check if the cloud disk exists, if the region is correct, or if the csi permissions are correct", diskID, GlobalConfigVar.Region) @@ -279,29 +278,29 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin switch action { case alreadyAttached: if !fromNode { - klog.Infof("AttachDisk: Disk %s is already attached to Instance %s, skipping", diskID, disk.InstanceId) + logger.V(2).Info("already attached, skipping") return disk.SerialNumber, nil } deviceName, err := GetVolumeDeviceName(diskID) if err == nil && deviceName != "" && IsFileExisting(deviceName) { - klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) + logger.V(2).Info("already attached", "device", deviceName) return deviceName, nil } else if disk.SerialNumber != "" { // wait for pci attach ready time.Sleep(5 * time.Second) - klog.Infof("AttachDisk: find disk dev after 5 seconds") + logger.V(2).Info("find disk dev after 5 seconds") deviceName, err := GetVolumeDeviceName(diskID) if err == nil && deviceName != "" && IsFileExisting(deviceName) { - klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName) + logger.V(2).Info("already attached (after 5s)", "device", deviceName) return deviceName, nil } err = fmt.Errorf("AttachDisk: disk device cannot be found in node, diskid: %s, deviceName: %s, err: %+v", diskID, deviceName, err) return "", err } - klog.Warningf("AttachDisk: Disk (no serial) %s is already attached to instance %s, but device unknown, will be detached and try again", diskID, disk.InstanceId) + logger.V(1).Info("disk has no serial, but device unknown, will be detached and try again") fallthrough case detachFirst: - klog.Infof("AttachDisk: Disk %s is already attached to instance %s, will be detached", diskID, disk.InstanceId) + logger.V(1).Info("already attached to another instance, will be detached", "from", disk.InstanceId) detachRequest := ecs.CreateDetachDiskRequest() detachRequest.InstanceId = disk.InstanceId detachRequest.DiskId = disk.DiskId @@ -310,10 +309,9 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin } _, err = ad.ecs.DetachDisk(detachRequest) if err != nil { - klog.Errorf("AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) return "", status.Errorf(codes.Aborted, "AttachDisk: Can't Detach disk %s from instance %s: with error: %v", diskID, disk.InstanceId, err) } - klog.Infof("AttachDisk: Wait for disk %s to be detached", diskID) + logger.V(2).Info("Waiting for disk to be detached") if err := ad.waitForDiskDetached(ctx, diskID, nodeID); err != nil { return "", err } @@ -333,7 +331,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin attachRequest.DiskId = diskID if action == forceAttach { attachRequest.Force = requests.NewBoolean(true) - logger.V(1).Info("try force attach", "from", disk.InstanceId, "to", nodeID) + logger.V(1).Info("try force attach", "from", disk.InstanceId) } if cate.SingleInstance { attachRequest.DeleteWithInstance = requests.NewBoolean(true) @@ -360,7 +358,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin } // Step 4: wait for disk attached - klog.Infof("AttachDisk: Waiting for Disk %s is Attached to instance %s with RequestId: %s", diskID, nodeID, response.RequestId) + logger.V(2).Info("waiting for disk to attach", "requestID", response.RequestId) if err := ad.waitForDiskAttached(ctx, diskID, nodeID); err != nil { return "", err } @@ -374,7 +372,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin return device, nil } - klog.Infof("AttachDisk: Successful attach disk %s to node %s", diskID, nodeID) + logger.V(2).Info("Successfully attached disk") return disk.SerialNumber, nil }