Skip to content

Commit e302a5f

Browse files
committed
disk: optimize batch create/delete
Integrating batcher and throttle.
1 parent 8217223 commit e302a5f

File tree

7 files changed

+96
-67
lines changed

7 files changed

+96
-67
lines changed

pkg/cloud/throttle/throttle.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
131131
}
132132
}
133133
}
134+
135+
func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
136+
return func(ctx context.Context, req TReq) (TResp, error) {
137+
var resp TResp
138+
err := t.Throttle(ctx, func() (err error) {
139+
resp, err = f(req)
140+
return err
141+
})
142+
return resp, err
143+
}
144+
}

pkg/disk/cloud.go

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,21 @@ const (
6565
DiskMultiAttachEnabled = "Enabled"
6666
)
6767

68-
type DiskAttachDetach struct {
68+
type DiskCloud struct {
6969
slots AttachDetachSlots
7070
waiter waitstatus.StatusWaiter[ecs.Disk]
7171
batcher batcher.Batcher[ecs.Disk]
7272

7373
attachThrottler *throttle.Throttler
7474
detachThrottler *throttle.Throttler
75+
createThrottler *throttle.Throttler
76+
deleteThrottler *throttle.Throttler
7577
detaching sync.Map
7678

7779
dev *DeviceManager
7880
}
7981

80-
func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
82+
func (ad *DiskCloud) possibleDisks(before sets.Set[string]) ([]string, error) {
8183
after, err := ad.dev.ListBlocks()
8284
if err != nil {
8385
return nil, fmt.Errorf("cannot list devices after attach: %w", err)
@@ -96,7 +98,7 @@ func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, er
9698
return disks, nil
9799
}
98100

99-
func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
101+
func (ad *DiskCloud) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
100102
logger := klog.FromContext(ctx)
101103
var bdf, device string
102104
var err error
@@ -160,7 +162,7 @@ func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial strin
160162

161163
// Attach Alibaba Cloud disk.
162164
// Returns device path if fromNode, disk serial number otherwise.
163-
func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
165+
func (ad *DiskCloud) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
164166
logger := klog.FromContext(ctx)
165167
logger.V(2).Info("Starting Do AttachDisk", "instanceID", nodeID, "region", GlobalConfigVar.Region)
166168

@@ -294,11 +296,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
294296
for key, value := range GlobalConfigVar.RequestBaseInfo {
295297
attachRequest.AppendUserAgent(key, value)
296298
}
297-
var response *ecs.AttachDiskResponse
298-
err = ad.attachThrottler.Throttle(ctx, func() error {
299-
response, err = ecsClient.AttachDisk(attachRequest)
300-
return err
301-
})
299+
response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest)
302300
if err != nil {
303301
var aliErr *alicloudErr.ServerError
304302
if errors.As(err, &aliErr) {
@@ -336,7 +334,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
336334
}
337335

338336
// Only called by controller
339-
func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
337+
func (ad *DiskCloud) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
340338
klog.Infof("AttachDisk: Starting Do AttachMultiAttachDisk: DiskId: %s, InstanceId: %s, Region: %v", diskID, nodeID, GlobalConfigVar.Region)
341339

342340
ecsClient := GlobalConfigVar.EcsClient
@@ -383,7 +381,7 @@ func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, n
383381
return "", nil
384382
}
385383

386-
func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskID, nodeID string) (isMultiAttach bool, err error) {
384+
func (ad *DiskCloud) detachMultiAttachDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskID, nodeID string) (isMultiAttach bool, err error) {
387385
disk, err := ad.findDiskByID(ctx, diskID)
388386
if err != nil {
389387
klog.Errorf("DetachMultiAttachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -420,7 +418,7 @@ func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient
420418
return true, nil
421419
}
422420

423-
func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskID, nodeID string, fromNode bool) (err error) {
421+
func (ad *DiskCloud) detachDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskID, nodeID string, fromNode bool) (err error) {
424422
disk, err := ad.findDiskByID(ctx, diskID)
425423
if err != nil {
426424
klog.Errorf("DetachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -469,11 +467,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI
469467
for key, value := range GlobalConfigVar.RequestBaseInfo {
470468
detachDiskRequest.AppendUserAgent(key, value)
471469
}
472-
var response *ecs.DetachDiskResponse
473-
err = ad.detachThrottler.Throttle(ctx, func() error {
474-
response, err = ecsClient.DetachDisk(detachDiskRequest)
475-
return err
476-
})
470+
response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest)
477471
if err != nil {
478472
return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err)
479473
}
@@ -576,7 +570,7 @@ func tagDiskAsK8sAttached(diskID string, ecsClient *ecs.Client) {
576570
klog.Infof("tagDiskAsK8sAttached:: add tag to disk: %s", diskID)
577571
}
578572

579-
func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
573+
func (ad *DiskCloud) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
580574
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
581575
return waitstatus.IsInstanceAttached(disk, nodeID)
582576
})
@@ -589,7 +583,7 @@ func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nod
589583
return nil
590584
}
591585

592-
func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
586+
func (ad *DiskCloud) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
593587
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
594588
return !waitstatus.IsInstanceAttached(disk, nodeID)
595589
})
@@ -603,7 +597,7 @@ func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nod
603597
return nil
604598
}
605599

606-
func (ad *DiskAttachDetach) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
600+
func (ad *DiskCloud) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
607601
return ad.batcher.Describe(ctx, diskID)
608602
}
609603

@@ -931,7 +925,7 @@ func isValidSnapshotName(name string) bool {
931925
return validDiskNameRegexp.MatchString(name)
932926
}
933927

934-
func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
928+
func (c *DiskCloud) createDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
935929
// 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来
936930
createDiskRequest := buildCreateDiskRequest(diskVol)
937931
if isValidDiskName(diskName) {
@@ -966,7 +960,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
966960
attempt.Instance = selectedInstance
967961
}
968962
retry:
969-
diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient)
963+
diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt, ecsClient)
970964
if err != nil {
971965
if final {
972966
return "", attempt, err
@@ -1055,11 +1049,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt
10551049

10561050
var ErrParameterMismatch = errors.New("parameter mismatch")
10571051

1058-
func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
1052+
func (c *DiskCloud) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
10591053
req = finalizeCreateDiskRequest(req, attempt)
10601054
klog.Infof("request: request content: %++v", req)
10611055

1062-
volumeRes, err := ecsClient.CreateDisk(req)
1056+
volumeRes, err := throttle.Throttled(c.createThrottler, ecsClient.CreateDisk)(ctx, req)
10631057
if err == nil {
10641058
klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId)
10651059
return volumeRes.DiskId, true, nil
@@ -1161,14 +1155,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag {
11611155
return diskTags
11621156
}
11631157

1164-
func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
1158+
func (c *DiskCloud) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
11651159
deleteDiskRequest := ecs.CreateDeleteDiskRequest()
11661160
deleteDiskRequest.DiskId = diskId
11671161

11681162
var resp *ecs.DeleteDiskResponse
11691163
err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) {
11701164
var err error
1171-
resp, err = ecsClient.DeleteDisk(deleteDiskRequest)
1165+
resp, err = throttle.Throttled(c.deleteThrottler, ecsClient.DeleteDisk)(ctx, deleteDiskRequest)
11721166
if err == nil {
11731167
klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId)
11741168
return true, nil

pkg/disk/cloud_test.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
910
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
1011
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
1112
gomock "github.com/golang/mock/gomock"
1213
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
14+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher"
15+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1316
"github.com/stretchr/testify/assert"
1417
"k8s.io/apimachinery/pkg/util/sets"
18+
"k8s.io/klog/v2/ktesting"
19+
"k8s.io/utils/clock"
1520
)
1621

1722
var (
@@ -31,37 +36,49 @@ func init() {
3136
}`), resizeDiskResponse)
3237
}
3338

34-
func TestDeleteDisk(t *testing.T) {
39+
func testCloud(t *testing.T) (*cloud.MockECSInterface, *DiskCloud) {
3540
ctrl := gomock.NewController(t)
3641
c := cloud.NewMockECSInterface(ctrl)
3742

43+
return c, &DiskCloud{
44+
dev: &DeviceManager{},
45+
createThrottler: defaultThrottler(),
46+
deleteThrottler: defaultThrottler(),
47+
batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8),
48+
}
49+
}
50+
51+
func TestDeleteDisk(t *testing.T) {
52+
c, cloud := testCloud(t)
53+
_, ctx := ktesting.NewTestContext(t)
54+
3855
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
3956

40-
_, err := deleteDisk(context.Background(), c, "test-disk")
57+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
4158
assert.Nil(t, err)
4259
}
4360

4461
func TestDeleteDiskRetryOnInitError(t *testing.T) {
4562
t.Parallel()
46-
ctrl := gomock.NewController(t)
47-
c := cloud.NewMockECSInterface(ctrl)
63+
c, cloud := testCloud(t)
64+
_, ctx := ktesting.NewTestContext(t)
4865

4966
initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "")
5067
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr)
5168
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
5269

53-
_, err := deleteDisk(context.Background(), c, "test-disk")
70+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
5471
assert.Nil(t, err)
5572
}
5673

5774
func TestDeleteDiskPassthroughError(t *testing.T) {
58-
ctrl := gomock.NewController(t)
59-
c := cloud.NewMockECSInterface(ctrl)
75+
c, cloud := testCloud(t)
76+
_, ctx := ktesting.NewTestContext(t)
6077

6178
serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")
6279
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr)
6380

64-
_, err := deleteDisk(context.Background(), c, "test-disk")
81+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
6582
assert.Equal(t, serverErr, err)
6683
}
6784

@@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) {
384401
}
385402
for _, c := range cases {
386403
t.Run(c.name, func(t *testing.T) {
387-
ctrl := gomock.NewController(t)
388-
client := cloud.NewMockECSInterface(ctrl)
404+
client, cloud := testCloud(t)
405+
_, ctx := ktesting.NewTestContext(t)
389406

390407
if !c.err {
391408
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
@@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) {
396413
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, ""))
397414
}
398415

399-
diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
416+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
400417
if c.err {
401418
assert.Error(t, err)
402419
} else {
@@ -409,16 +426,16 @@ func TestCreateDisk_Basic(t *testing.T) {
409426
}
410427

411428
func TestCreateDisk_ServerFailFallback(t *testing.T) {
412-
ctrl := gomock.NewController(t)
413-
client := cloud.NewMockECSInterface(ctrl)
429+
client, cloud := testCloud(t)
430+
_, ctx := ktesting.NewTestContext(t)
414431

415432
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, ""))
416433
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
417434
DiskId: "d-123",
418435
}, nil)
419436

420437
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
421-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
438+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
422439
assert.NoError(t, err)
423440
assert.Equal(t, "d-123", diskID)
424441
assert.Equal(t, DiskESSDAuto, attempt.Category)
@@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
456473
}
457474
for _, c := range cases {
458475
t.Run(c.name, func(t *testing.T) {
459-
ctrl := gomock.NewController(t)
460-
client := cloud.NewMockECSInterface(ctrl)
476+
client, cloud := testCloud(t)
477+
_, ctx := ktesting.NewTestContext(t)
461478

462479
r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, ""))
463480
r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
472489
}
473490

474491
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
475-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
492+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
476493
if c.err {
477494
assert.Error(t, err)
478495
} else {
@@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
492509
}
493510

494511
func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
495-
ctrl := gomock.NewController(t)
496-
client := cloud.NewMockECSInterface(ctrl)
512+
client, cloud := testCloud(t)
513+
_, ctx := ktesting.NewTestContext(t)
497514

498515
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2)
499516
client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
503520
}, nil)
504521

505522
args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}
506-
_, _, err := createDisk(client, "disk-name", "", args, nil, "", false)
523+
_, _, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
507524
assert.Error(t, err)
508525
}
509526

0 commit comments

Comments
 (0)