Skip to content

Commit 876d684

Browse files
committed
disk: optimize batch create/delete
Integrating batcher and throttle.
1 parent 294875b commit 876d684

File tree

7 files changed

+97
-68
lines changed

7 files changed

+97
-68
lines changed

pkg/cloud/throttle/throttle.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
131131
}
132132
}
133133
}
134+
135+
func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
136+
return func(ctx context.Context, req TReq) (TResp, error) {
137+
var resp TResp
138+
err := t.Throttle(ctx, func() (err error) {
139+
resp, err = f(req)
140+
return err
141+
})
142+
return resp, err
143+
}
144+
}

pkg/disk/cloud.go

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,21 @@ const (
6565
DiskMultiAttachEnabled = "Enabled"
6666
)
6767

68-
type DiskAttachDetach struct {
68+
type DiskCloud struct {
6969
slots AttachDetachSlots
7070
waiter waitstatus.StatusWaiter[ecs.Disk]
7171
batcher batcher.Batcher[ecs.Disk]
7272

7373
attachThrottler *throttle.Throttler
7474
detachThrottler *throttle.Throttler
75+
createThrottler *throttle.Throttler
76+
deleteThrottler *throttle.Throttler
7577
detaching sync.Map
7678

7779
dev *DeviceManager
7880
}
7981

80-
func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
82+
func (ad *DiskCloud) possibleDisks(before sets.Set[string]) ([]string, error) {
8183
after, err := ad.dev.ListBlocks()
8284
if err != nil {
8385
return nil, fmt.Errorf("cannot list devices after attach: %w", err)
@@ -96,7 +98,7 @@ func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, er
9698
return disks, nil
9799
}
98100

99-
func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
101+
func (ad *DiskCloud) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
100102
logger := klog.FromContext(ctx)
101103
var bdf, device string
102104
var err error
@@ -160,7 +162,7 @@ func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial strin
160162

161163
// Attach Alibaba Cloud disk.
162164
// Returns device path if fromNode, disk serial number otherwise.
163-
func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
165+
func (ad *DiskCloud) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
164166
logger := klog.FromContext(ctx)
165167
logger.V(2).Info("Starting Do AttachDisk", "instanceID", nodeID, "region", GlobalConfigVar.Region)
166168

@@ -283,11 +285,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
283285
for key, value := range GlobalConfigVar.RequestBaseInfo {
284286
attachRequest.AppendUserAgent(key, value)
285287
}
286-
var response *ecs.AttachDiskResponse
287-
err = ad.attachThrottler.Throttle(ctx, func() error {
288-
response, err = ecsClient.AttachDisk(attachRequest)
289-
return err
290-
})
288+
response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest)
291289
if err != nil {
292290
var aliErr *alicloudErr.ServerError
293291
if errors.As(err, &aliErr) {
@@ -325,7 +323,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
325323
}
326324

327325
// Only called by controller
328-
func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
326+
func (ad *DiskCloud) attachMultiAttachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
329327
klog.Infof("AttachDisk: Starting Do AttachMultiAttachDisk: DiskId: %s, InstanceId: %s, Region: %v", diskID, nodeID, GlobalConfigVar.Region)
330328

331329
ecsClient := updateEcsClient(GlobalConfigVar.EcsClient)
@@ -372,7 +370,7 @@ func (ad *DiskAttachDetach) attachMultiAttachDisk(ctx context.Context, diskID, n
372370
return "", nil
373371
}
374372

375-
func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (isMultiAttach bool, err error) {
373+
func (ad *DiskCloud) detachMultiAttachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (isMultiAttach bool, err error) {
376374
disk, err := ad.findDiskByID(ctx, diskID)
377375
if err != nil {
378376
klog.Errorf("DetachMultiAttachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -409,7 +407,7 @@ func (ad *DiskAttachDetach) detachMultiAttachDisk(ctx context.Context, ecsClient
409407
return true, nil
410408
}
411409

412-
func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (err error) {
410+
func (ad *DiskCloud) detachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID string) (err error) {
413411
disk, err := ad.findDiskByID(ctx, diskID)
414412
if err != nil {
415413
klog.Errorf("DetachDisk: Describe volume: %s from node: %s, with error: %s", diskID, nodeID, err.Error())
@@ -454,11 +452,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient *ecs.Clien
454452
for key, value := range GlobalConfigVar.RequestBaseInfo {
455453
detachDiskRequest.AppendUserAgent(key, value)
456454
}
457-
var response *ecs.DetachDiskResponse
458-
err = ad.detachThrottler.Throttle(ctx, func() error {
459-
response, err = ecsClient.DetachDisk(detachDiskRequest)
460-
return err
461-
})
455+
response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest)
462456
if err != nil {
463457
return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err)
464458
}
@@ -561,7 +555,7 @@ func tagDiskAsK8sAttached(diskID string, ecsClient *ecs.Client) {
561555
klog.Infof("tagDiskAsK8sAttached:: add tag to disk: %s", diskID)
562556
}
563557

564-
func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
558+
func (ad *DiskCloud) waitForDiskAttached(ctx context.Context, diskID, nodeID string) error {
565559
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
566560
return waitstatus.IsInstanceAttached(disk, nodeID)
567561
})
@@ -574,7 +568,7 @@ func (ad *DiskAttachDetach) waitForDiskAttached(ctx context.Context, diskID, nod
574568
return nil
575569
}
576570

577-
func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
571+
func (ad *DiskCloud) waitForDiskDetached(ctx context.Context, diskID, nodeID string) error {
578572
disk, err := ad.waiter.WaitFor(ctx, diskID, func(disk *ecs.Disk) bool {
579573
return !waitstatus.IsInstanceAttached(disk, nodeID)
580574
})
@@ -588,7 +582,7 @@ func (ad *DiskAttachDetach) waitForDiskDetached(ctx context.Context, diskID, nod
588582
return nil
589583
}
590584

591-
func (ad *DiskAttachDetach) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
585+
func (ad *DiskCloud) findDiskByID(ctx context.Context, diskID string) (*ecs.Disk, error) {
592586
return ad.batcher.Describe(ctx, diskID)
593587
}
594588

@@ -916,7 +910,7 @@ func isValidSnapshotName(name string) bool {
916910
return validDiskNameRegexp.MatchString(name)
917911
}
918912

919-
func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
913+
func (c *DiskCloud) createDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
920914
// 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来
921915
createDiskRequest := buildCreateDiskRequest(diskVol)
922916
if isValidDiskName(diskName) {
@@ -951,7 +945,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
951945
attempt.Instance = selectedInstance
952946
}
953947
retry:
954-
diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient)
948+
diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt, ecsClient)
955949
if err != nil {
956950
if final {
957951
return "", attempt, err
@@ -1040,11 +1034,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt
10401034

10411035
var ErrParameterMismatch = errors.New("parameter mismatch")
10421036

1043-
func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
1037+
func (c *DiskCloud) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
10441038
req = finalizeCreateDiskRequest(req, attempt)
10451039
klog.Infof("request: request content: %++v", req)
10461040

1047-
volumeRes, err := ecsClient.CreateDisk(req)
1041+
volumeRes, err := throttle.Throttled(c.createThrottler, ecsClient.CreateDisk)(ctx, req)
10481042
if err == nil {
10491043
klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId)
10501044
return volumeRes.DiskId, true, nil
@@ -1146,14 +1140,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag {
11461140
return diskTags
11471141
}
11481142

1149-
func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
1143+
func (c *DiskCloud) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
11501144
deleteDiskRequest := ecs.CreateDeleteDiskRequest()
11511145
deleteDiskRequest.DiskId = diskId
11521146

11531147
var resp *ecs.DeleteDiskResponse
11541148
err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) {
11551149
var err error
1156-
resp, err = ecsClient.DeleteDisk(deleteDiskRequest)
1150+
resp, err = throttle.Throttled(c.deleteThrottler, ecsClient.DeleteDisk)(ctx, deleteDiskRequest)
11571151
if err == nil {
11581152
klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId)
11591153
return true, nil

pkg/disk/cloud_test.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
910
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
1011
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
1112
gomock "github.com/golang/mock/gomock"
1213
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
14+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher"
15+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1316
"github.com/stretchr/testify/assert"
1417
"k8s.io/apimachinery/pkg/util/sets"
18+
"k8s.io/klog/v2/ktesting"
19+
"k8s.io/utils/clock"
1520
)
1621

1722
var (
@@ -31,37 +36,49 @@ func init() {
3136
}`), resizeDiskResponse)
3237
}
3338

34-
func TestDeleteDisk(t *testing.T) {
39+
func testCloud(t *testing.T) (*cloud.MockECSInterface, *DiskCloud) {
3540
ctrl := gomock.NewController(t)
3641
c := cloud.NewMockECSInterface(ctrl)
3742

43+
return c, &DiskCloud{
44+
dev: &DeviceManager{},
45+
createThrottler: defaultThrottler(),
46+
deleteThrottler: defaultThrottler(),
47+
batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8),
48+
}
49+
}
50+
51+
func TestDeleteDisk(t *testing.T) {
52+
c, cloud := testCloud(t)
53+
_, ctx := ktesting.NewTestContext(t)
54+
3855
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
3956

40-
_, err := deleteDisk(context.Background(), c, "test-disk")
57+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
4158
assert.Nil(t, err)
4259
}
4360

4461
func TestDeleteDiskRetryOnInitError(t *testing.T) {
4562
t.Parallel()
46-
ctrl := gomock.NewController(t)
47-
c := cloud.NewMockECSInterface(ctrl)
63+
c, cloud := testCloud(t)
64+
_, ctx := ktesting.NewTestContext(t)
4865

4966
initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "")
5067
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr)
5168
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
5269

53-
_, err := deleteDisk(context.Background(), c, "test-disk")
70+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
5471
assert.Nil(t, err)
5572
}
5673

5774
func TestDeleteDiskPassthroughError(t *testing.T) {
58-
ctrl := gomock.NewController(t)
59-
c := cloud.NewMockECSInterface(ctrl)
75+
c, cloud := testCloud(t)
76+
_, ctx := ktesting.NewTestContext(t)
6077

6178
serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")
6279
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr)
6380

64-
_, err := deleteDisk(context.Background(), c, "test-disk")
81+
_, err := cloud.deleteDisk(ctx, c, "test-disk")
6582
assert.Equal(t, serverErr, err)
6683
}
6784

@@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) {
384401
}
385402
for _, c := range cases {
386403
t.Run(c.name, func(t *testing.T) {
387-
ctrl := gomock.NewController(t)
388-
client := cloud.NewMockECSInterface(ctrl)
404+
client, cloud := testCloud(t)
405+
_, ctx := ktesting.NewTestContext(t)
389406

390407
if !c.err {
391408
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
@@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) {
396413
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, ""))
397414
}
398415

399-
diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
416+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
400417
if c.err {
401418
assert.Error(t, err)
402419
} else {
@@ -409,16 +426,16 @@ func TestCreateDisk_Basic(t *testing.T) {
409426
}
410427

411428
func TestCreateDisk_ServerFailFallback(t *testing.T) {
412-
ctrl := gomock.NewController(t)
413-
client := cloud.NewMockECSInterface(ctrl)
429+
client, cloud := testCloud(t)
430+
_, ctx := ktesting.NewTestContext(t)
414431

415432
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, ""))
416433
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
417434
DiskId: "d-123",
418435
}, nil)
419436

420437
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
421-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
438+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
422439
assert.NoError(t, err)
423440
assert.Equal(t, "d-123", diskID)
424441
assert.Equal(t, DiskESSDAuto, attempt.Category)
@@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
456473
}
457474
for _, c := range cases {
458475
t.Run(c.name, func(t *testing.T) {
459-
ctrl := gomock.NewController(t)
460-
client := cloud.NewMockECSInterface(ctrl)
476+
client, cloud := testCloud(t)
477+
_, ctx := ktesting.NewTestContext(t)
461478

462479
r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, ""))
463480
r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
472489
}
473490

474491
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
475-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
492+
diskID, attempt, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
476493
if c.err {
477494
assert.Error(t, err)
478495
} else {
@@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
492509
}
493510

494511
func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
495-
ctrl := gomock.NewController(t)
496-
client := cloud.NewMockECSInterface(ctrl)
512+
client, cloud := testCloud(t)
513+
_, ctx := ktesting.NewTestContext(t)
497514

498515
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2)
499516
client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
503520
}, nil)
504521

505522
args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}
506-
_, _, err := createDisk(client, "disk-name", "", args, nil, "", false)
523+
_, _, err := cloud.createDisk(ctx, client, "disk-name", "", args, nil, "", false)
507524
assert.Error(t, err)
508525
}
509526

0 commit comments

Comments
 (0)