Skip to content

Commit ca78c20

Browse files
committed
disk: optimize batch create/delete
Integrating batcher and throttle.
1 parent 7e9d238 commit ca78c20

File tree

7 files changed

+97
-68
lines changed

7 files changed

+97
-68
lines changed

pkg/cloud/throttle/throttle.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
131131
}
132132
}
133133
}
134+
135+
func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
136+
return func(ctx context.Context, req TReq) (TResp, error) {
137+
var resp TResp
138+
err := t.Throttle(ctx, func() (err error) {
139+
resp, err = f(req)
140+
return err
141+
})
142+
return resp, err
143+
}
144+
}

pkg/disk/cloud.go

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,21 @@ const (
6565
DiskMultiAttachEnabled = "Enabled"
6666
)
6767

68-
type DiskAttachDetach struct {
68+
type DiskCloud struct {
6969
slots AttachDetachSlots
7070
waiter waitstatus.StatusWaiter[ecs.Disk]
7171
batcher batcher.Batcher[ecs.Disk]
7272

7373
attachThrottler *throttle.Throttler
7474
detachThrottler *throttle.Throttler
75+
createThrottler *throttle.Throttler
76+
deleteThrottler *throttle.Throttler
7577
detaching sync.Map
7678

7779
dev *DeviceManager
7880
}
7981

80-
func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
82+
func (ad *DiskCloud) possibleDisks(before sets.Set[string]) ([]string, error) {
8183
after, err := ad.dev.ListBlocks()
8284
if err != nil {
8385
return nil, fmt.Errorf("cannot list devices after attach: %w", err)
@@ -96,7 +98,7 @@ func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, er
9698
return disks, nil
9799
}
98100

99-
func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
101+
func (ad *DiskCloud) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
100102
logger := klog.FromContext(ctx)
101103
var bdf, device string
102104
var err error
@@ -160,7 +162,7 @@ func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial strin
160162

161163
// Attach Alibaba Cloud disk.
162164
// Returns device path if fromNode, disk serial number otherwise.
163-
func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
165+
func (ad *DiskCloud) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
164166
logger := klog.FromContext(ctx)
165167
logger.V(2).Info("Starting Do AttachDisk", "instanceID", nodeID, "region", GlobalConfigVar.Region)
166168

@@ -283,11 +285,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
283285
for key, value := range GlobalConfigVar.RequestBaseInfo {
284286
attachRequest.AppendUserAgent(key, value)
285287
}
286-
var response *ecs.AttachDiskResponse
287-
err = ad.attachThrottler.Throttle(ctx, func() error {
288-
response, err = ecsClient.AttachDisk(attachRequest)
289-
return err
290-
})
288+
response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest)
291289
if err != nil {
292290
var aliErr *alicloudErr.ServerError
293291
if errors.As(err, &aliErr) {
@@ -325,7 +323,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
325323
}
326324

327325
// Only called by controller
328-
func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
326+
func (ad *DiskCloud) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
329327
klog.Infof("AttachDisk: Starting Do AttachMultiAttachDisk: DiskId: %s, InstanceId: %s, Region: %v", diskID, nodeID, GlobalConfigVar.Region)
330328

331329
ecsClient := updateEcsClient(GlobalConfigVar.EcsClient)
@@ -372,7 +370,7 @@ func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, n
372370
return "", nil
373371
}
374372

375-
func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (isMultiAttach bool, err error) {
373+
func (ad *DiskCloud) detachMultiAttachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (isMultiAttach bool, err error) {
376374
disk, err := ad.findDiskByID(ctx, diskID)
377375
if err != nil {
378376
klog.Errorf("DetachMultiAttachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -409,7 +407,7 @@ func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient
409407
return true, nil
410408
}
411409

412-
func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string, fromNode bool) (err error) {
410+
func (ad *DiskCloud) detachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string, fromNode bool) (err error) {
413411
disk, err := ad.findDiskByID(ctx, diskID)
414412
if err != nil {
415413
klog.Errorf("DetachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -458,11 +456,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient *ecs.Clien
458456
for key, value := range GlobalConfigVar.RequestBaseInfo {
459457
detachDiskRequest.AppendUserAgent(key, value)
460458
}
461-
var response *ecs.DetachDiskResponse
462-
err = ad.detachThrottler.Throttle(ctx, func() error {
463-
response, err = ecsClient.DetachDisk(detachDiskRequest)
464-
return err
465-
})
459+
response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest)
466460
if err != nil {
467461
return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err)
468462
}
@@ -565,7 +559,7 @@ func tagDiskAsK8sAttached(diskID string, ecsClient *ecs.Client) {
565559
klog.Infof("tagDiskAsK8sAttached:: add tag to disk: %s", diskID)
566560
}
567561

568-
func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
562+
func (ad *DiskCloud) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
569563
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
570564
return waitstatus.IsInstanceAttached(disk, nodeID)
571565
})
@@ -578,7 +572,7 @@ func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nod
578572
return nil
579573
}
580574

581-
func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
575+
func (ad *DiskCloud) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
582576
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
583577
return !waitstatus.IsInstanceAttached(disk, nodeID)
584578
})
@@ -592,7 +586,7 @@ func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nod
592586
return nil
593587
}
594588

595-
func (ad *DiskAttachDetach) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
589+
func (ad *DiskCloud) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
596590
return ad.batcher.Describe(ctx, diskID)
597591
}
598592

@@ -920,7 +914,7 @@ func isValidSnapshotName(name string) bool {
920914
return validDiskNameRegexp.MatchString(name)
921915
}
922916

923-
func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
917+
func (c *DiskCloud) createDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
924918
// 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来
925919
createDiskRequest := buildCreateDiskRequest(diskVol)
926920
if isValidDiskName(diskName) {
@@ -955,7 +949,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
955949
attempt.Instance = selectedInstance
956950
}
957951
retry:
958-
diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient)
952+
diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt, ecsClient)
959953
if err != nil {
960954
if final {
961955
return "", attempt, err
@@ -1044,11 +1038,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt
10441038

10451039
var ErrParameterMismatch = errors.New("parameter mismatch")
10461040

1047-
func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
1041+
func (c *DiskCloud) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
10481042
req = finalizeCreateDiskRequest(req, attempt)
10491043
klog.Infof("request: request content: %++v", req)
10501044

1051-
volumeRes, err := ecsClient.CreateDisk(req)
1045+
volumeRes, err := throttle.Throttled(c.createThrottler, ecsClient.CreateDisk)(ctx, req)
10521046
if err == nil {
10531047
klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId)
10541048
return volumeRes.DiskId, true, nil
@@ -1150,14 +1144,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag {
11501144
return diskTags
11511145
}
11521146

1153-
func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
1147+
func (c *DiskCloud) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
11541148
deleteDiskRequest := ecs.CreateDeleteDiskRequest()
11551149
deleteDiskRequest.DiskId = diskId
11561150

11571151
var resp *ecs.DeleteDiskResponse
11581152
err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) {
11591153
var err error
1160-
resp, err = ecsClient.DeleteDisk(deleteDiskRequest)
1154+
resp, err = throttle.Throttled(c.deleteThrottler, ecsClient.DeleteDisk)(ctx, deleteDiskRequest)
11611155
if err == nil {
11621156
klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId)
11631157
return true, nil

pkg/disk/cloud_test.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
910
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
1011
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
1112
gomock "github.com/golang/mock/gomock"
1213
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
14+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher"
15+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1316
"github.com/stretchr/testify/assert"
1417
"k8s.io/apimachinery/pkg/util/sets"
18+
"k8s.io/klog/v2/ktesting"
19+
"k8s.io/utils/clock"
1520
)
1621

1722
var (
@@ -31,37 +36,49 @@ func init() {
3136
}`), resizeDiskResponse)
3237
}
3338

34-
func TestDeleteDisk(t *testing.T) {
39+
func testCloud(t *testing.T) (*cloud.MockECSInterface, *DiskCloud) {
3540
ctrl := gomock.NewController(t)
3641
c := cloud.NewMockECSInterface(ctrl)
3742

43+
return c, &DiskCloud{
44+
dev: &DeviceManager{},
45+
createThrottler: defaultThrottler(),
46+
deleteThrottler: defaultThrottler(),
47+
batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8),
48+
}
49+
}
50+
51+
func TestDeleteDisk(t *testing.T) {
52+
c, cloud := testCloud(t)
53+
_, ctx := ktesting.NewTestContext(t)
54+
3855
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
3956

40-
_, err := deleteDisk(context.Background(), c, "test-disk")
57+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
4158
assert.Nil(t, err)
4259
}
4360

4461
func TestDeleteDiskRetryOnInitError(t *testing.T) {
4562
t.Parallel()
46-
ctrl := gomock.NewController(t)
47-
c := cloud.NewMockECSInterface(ctrl)
63+
c, cloud := testCloud(t)
64+
_, ctx := ktesting.NewTestContext(t)
4865

4966
initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "")
5067
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr)
5168
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
5269

53-
_, err := deleteDisk(context.Background(), c, "test-disk")
70+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
5471
assert.Nil(t, err)
5572
}
5673

5774
func TestDeleteDiskPassthroughError(t *testing.T) {
58-
ctrl := gomock.NewController(t)
59-
c := cloud.NewMockECSInterface(ctrl)
75+
c, cloud := testCloud(t)
76+
_, ctx := ktesting.NewTestContext(t)
6077

6178
serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")
6279
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr)
6380

64-
_, err := deleteDisk(context.Background(), c, "test-disk")
81+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
6582
assert.Equal(t, serverErr, err)
6683
}
6784

@@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) {
384401
}
385402
for _, c := range cases {
386403
t.Run(c.name, func(t *testing.T) {
387-
ctrl := gomock.NewController(t)
388-
client := cloud.NewMockECSInterface(ctrl)
404+
client, cloud := testCloud(t)
405+
_, ctx := ktesting.NewTestContext(t)
389406

390407
if !c.err {
391408
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
@@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) {
396413
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, ""))
397414
}
398415

399-
diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
416+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
400417
if c.err {
401418
assert.Error(t, err)
402419
} else {
@@ -409,16 +426,16 @@ func TestCreateDisk_Basic(t *testing.T) {
409426
}
410427

411428
func TestCreateDisk_ServerFailFallback(t *testing.T) {
412-
ctrl := gomock.NewController(t)
413-
client := cloud.NewMockECSInterface(ctrl)
429+
client, cloud := testCloud(t)
430+
_, ctx := ktesting.NewTestContext(t)
414431

415432
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, ""))
416433
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
417434
DiskId: "d-123",
418435
}, nil)
419436

420437
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
421-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
438+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
422439
assert.NoError(t, err)
423440
assert.Equal(t, "d-123", diskID)
424441
assert.Equal(t, DiskESSDAuto, attempt.Category)
@@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
456473
}
457474
for _, c := range cases {
458475
t.Run(c.name, func(t *testing.T) {
459-
ctrl := gomock.NewController(t)
460-
client := cloud.NewMockECSInterface(ctrl)
476+
client, cloud := testCloud(t)
477+
_, ctx := ktesting.NewTestContext(t)
461478

462479
r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, ""))
463480
r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
472489
}
473490

474491
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
475-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
492+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
476493
if c.err {
477494
assert.Error(t, err)
478495
} else {
@@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
492509
}
493510

494511
func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
495-
ctrl := gomock.NewController(t)
496-
client := cloud.NewMockECSInterface(ctrl)
512+
client, cloud := testCloud(t)
513+
_, ctx := ktesting.NewTestContext(t)
497514

498515
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2)
499516
client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
503520
}, nil)
504521

505522
args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}
506-
_, _, err := createDisk(client, "disk-name", "", args, nil, "", false)
523+
_, _, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
507524
assert.Error(t, err)
508525
}
509526

0 commit comments

Comments
 (0)