Skip to content

Commit 6c21dcf

Browse files
committed
disk: optimize batch create/delete
Integrating batcher and throttle.
1 parent 5be8fbb commit 6c21dcf

File tree

6 files changed

+87
-48
lines changed

6 files changed

+87
-48
lines changed

pkg/cloud/throttle/throttle.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
131131
}
132132
}
133133
}
134+
135+
func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
136+
return func(ctx context.Context, req TReq) (TResp, error) {
137+
var resp TResp
138+
err := t.Throttle(ctx, func() (err error) {
139+
resp, err = f(req)
140+
return err
141+
})
142+
return resp, err
143+
}
144+
}

pkg/disk/cloud.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ type DiskAttachDetach struct {
7777
dev *DeviceManager
7878
}
7979

80+
type DiskCreateDelete struct {
81+
ecs cloud.ECSInterface
82+
batcher batcher.Batcher[ecs.Disk]
83+
createThrottler *throttle.Throttler
84+
deleteThrottler *throttle.Throttler
85+
}
86+
8087
func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
8188
after, err := ad.dev.ListBlocks()
8289
if err != nil {
@@ -290,11 +297,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
290297
for key, value := range GlobalConfigVar.RequestBaseInfo {
291298
attachRequest.AppendUserAgent(key, value)
292299
}
293-
var response *ecs.AttachDiskResponse
294-
err = ad.attachThrottler.Throttle(ctx, func() error {
295-
response, err = ecsClient.AttachDisk(attachRequest)
296-
return err
297-
})
300+
response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest)
298301
if err != nil {
299302
var aliErr *alicloudErr.ServerError
300303
if errors.As(err, &aliErr) {
@@ -465,11 +468,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI
465468
for key, value := range GlobalConfigVar.RequestBaseInfo {
466469
detachDiskRequest.AppendUserAgent(key, value)
467470
}
468-
var response *ecs.DetachDiskResponse
469-
err = ad.detachThrottler.Throttle(ctx, func() error {
470-
response, err = ecsClient.DetachDisk(detachDiskRequest)
471-
return err
472-
})
471+
response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest)
473472
if err != nil {
474473
return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err)
475474
}
@@ -927,7 +926,7 @@ func isValidSnapshotName(name string) bool {
927926
return validDiskNameRegexp.MatchString(name)
928927
}
929928

930-
func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
929+
func (c *DiskCreateDelete) createDisk(ctx context.Context, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
931930
// 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来
932931
createDiskRequest := buildCreateDiskRequest(diskVol)
933932
if isValidDiskName(diskName) {
@@ -962,7 +961,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
962961
attempt.Instance = selectedInstance
963962
}
964963
retry:
965-
diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient)
964+
diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt)
966965
if err != nil {
967966
if final {
968967
return "", attempt, err
@@ -972,7 +971,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
972971
// protect us from infinite loop
973972
return "", attempt, status.Error(codes.Internal, "unexpected parameter mismatch")
974973
}
975-
existingDisk, err := findDiskByName(diskName, ecsClient)
974+
existingDisk, err := findDiskByName(diskName, c.ecs)
976975
if err != nil {
977976
return "", attempt, status.Errorf(codes.Internal, "parameter mismatch detected, but fetch existing disk failed: %v", err)
978977
}
@@ -1051,11 +1050,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt
10511050

10521051
var ErrParameterMismatch = errors.New("parameter mismatch")
10531052

1054-
func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
1053+
func (c *DiskCreateDelete) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt) (diskId string, final bool, err error) {
10551054
req = finalizeCreateDiskRequest(req, attempt)
10561055
klog.Infof("request: request content: %++v", req)
10571056

1058-
volumeRes, err := ecsClient.CreateDisk(req)
1057+
volumeRes, err := throttle.Throttled(c.createThrottler, c.ecs.CreateDisk)(ctx, req)
10591058
if err == nil {
10601059
klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId)
10611060
return volumeRes.DiskId, true, nil
@@ -1157,14 +1156,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag {
11571156
return diskTags
11581157
}
11591158

1160-
func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
1159+
func (c *DiskCreateDelete) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
11611160
deleteDiskRequest := ecs.CreateDeleteDiskRequest()
11621161
deleteDiskRequest.DiskId = diskId
11631162

11641163
var resp *ecs.DeleteDiskResponse
11651164
err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) {
11661165
var err error
1167-
resp, err = ecsClient.DeleteDisk(deleteDiskRequest)
1166+
resp, err = throttle.Throttled(c.deleteThrottler, c.ecs.DeleteDisk)(ctx, deleteDiskRequest)
11681167
if err == nil {
11691168
klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId)
11701169
return true, nil

pkg/disk/cloud_test.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
910
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
1011
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
1112
gomock "github.com/golang/mock/gomock"
1213
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
14+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher"
15+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1316
"github.com/stretchr/testify/assert"
1417
"k8s.io/apimachinery/pkg/util/sets"
18+
"k8s.io/klog/v2/ktesting"
19+
"k8s.io/utils/clock"
1520
)
1621

1722
var (
@@ -31,37 +36,49 @@ func init() {
3136
}`), resizeDiskResponse)
3237
}
3338

34-
func TestDeleteDisk(t *testing.T) {
39+
func testCreateDelete(t *testing.T) (*cloud.MockECSInterface, *DiskCreateDelete) {
3540
ctrl := gomock.NewController(t)
3641
c := cloud.NewMockECSInterface(ctrl)
3742

43+
return c, &DiskCreateDelete{
44+
ecs: c,
45+
createThrottler: defaultThrottler(),
46+
deleteThrottler: defaultThrottler(),
47+
batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8),
48+
}
49+
}
50+
51+
func TestDeleteDisk(t *testing.T) {
52+
c, cd := testCreateDelete(t)
53+
_, ctx := ktesting.NewTestContext(t)
54+
3855
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
3956

40-
_, err := deleteDisk(context.Background(), c, "test-disk")
57+
_, err := cd.deleteDisk(ctx, c, "test-disk")
4158
assert.Nil(t, err)
4259
}
4360

4461
func TestDeleteDiskRetryOnInitError(t *testing.T) {
4562
t.Parallel()
46-
ctrl := gomock.NewController(t)
47-
c := cloud.NewMockECSInterface(ctrl)
63+
c, cd := testCreateDelete(t)
64+
_, ctx := ktesting.NewTestContext(t)
4865

4966
initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "")
5067
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr)
5168
c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil)
5269

53-
_, err := deleteDisk(context.Background(), c, "test-disk")
70+
_, err := cd.deleteDisk(ctx, c, "test-disk")
5471
assert.Nil(t, err)
5572
}
5673

5774
func TestDeleteDiskPassthroughError(t *testing.T) {
58-
ctrl := gomock.NewController(t)
59-
c := cloud.NewMockECSInterface(ctrl)
75+
c, cd := testCreateDelete(t)
76+
_, ctx := ktesting.NewTestContext(t)
6077

6178
serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")
6279
c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr)
6380

64-
_, err := deleteDisk(context.Background(), c, "test-disk")
81+
_, err := cd.deleteDisk(ctx, c, "test-disk")
6582
assert.Equal(t, serverErr, err)
6683
}
6784

@@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) {
384401
}
385402
for _, c := range cases {
386403
t.Run(c.name, func(t *testing.T) {
387-
ctrl := gomock.NewController(t)
388-
client := cloud.NewMockECSInterface(ctrl)
404+
client, cd := testCreateDelete(t)
405+
_, ctx := ktesting.NewTestContext(t)
389406

390407
if !c.err {
391408
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
@@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) {
396413
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, ""))
397414
}
398415

399-
diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
416+
diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode)
400417
if c.err {
401418
assert.Error(t, err)
402419
} else {
@@ -409,16 +426,16 @@ func TestCreateDisk_Basic(t *testing.T) {
409426
}
410427

411428
func TestCreateDisk_ServerFailFallback(t *testing.T) {
412-
ctrl := gomock.NewController(t)
413-
client := cloud.NewMockECSInterface(ctrl)
429+
client, cd := testCreateDelete(t)
430+
_, ctx := ktesting.NewTestContext(t)
414431

415432
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, ""))
416433
client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{
417434
DiskId: "d-123",
418435
}, nil)
419436

420437
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
421-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
438+
diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false)
422439
assert.NoError(t, err)
423440
assert.Equal(t, "d-123", diskID)
424441
assert.Equal(t, DiskESSDAuto, attempt.Category)
@@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
456473
}
457474
for _, c := range cases {
458475
t.Run(c.name, func(t *testing.T) {
459-
ctrl := gomock.NewController(t)
460-
client := cloud.NewMockECSInterface(ctrl)
476+
client, cd := testCreateDelete(t)
477+
_, ctx := ktesting.NewTestContext(t)
461478

462479
r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, ""))
463480
r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
472489
}
473490

474491
args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20}
475-
diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false)
492+
diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false)
476493
if c.err {
477494
assert.Error(t, err)
478495
} else {
@@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) {
492509
}
493510

494511
func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
495-
ctrl := gomock.NewController(t)
496-
client := cloud.NewMockECSInterface(ctrl)
512+
client, cd := testCreateDelete(t)
513+
_, ctx := ktesting.NewTestContext(t)
497514

498515
client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2)
499516
client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{
@@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) {
503520
}, nil)
504521

505522
args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}
506-
_, _, err := createDisk(client, "disk-name", "", args, nil, "", false)
523+
_, _, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false)
507524
assert.Error(t, err)
508525
}
509526

pkg/disk/controllerserver.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
type controllerServer struct {
5353
recorder record.EventRecorder
5454
ad DiskAttachDetach
55+
cd DiskCreateDelete
5556
meta metadata.MetadataProvider
5657
ecs cloud.ECSInterface
5758
snapshotWaiter waitstatus.StatusWaiter[ecs.Snapshot]
@@ -104,6 +105,12 @@ func NewControllerServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata
104105
attachThrottler: defaultThrottler(),
105106
detachThrottler: defaultThrottler(),
106107
},
108+
cd: DiskCreateDelete{
109+
ecs: ecs,
110+
batcher: batcher,
111+
createThrottler: defaultThrottler(),
112+
deleteThrottler: defaultThrottler(),
113+
},
107114
snapshotWaiter: newSnapshotStatusWaiter(),
108115
}
109116
detachConcurrency := 1
@@ -177,7 +184,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
177184
isVirtualNode = node.Labels[common.NodeTypeLabelKey] == common.VirtualNodeType
178185
}
179186

180-
diskID, attempt, err := createDisk(cs.ecs, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode)
187+
diskID, attempt, err := cs.cd.createDisk(ctx, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode)
181188
if err != nil {
182189
if errors.Is(err, ErrParameterMismatch) {
183190
return nil, status.Errorf(codes.AlreadyExists, "volume %s already created but %v", req.Name, err)
@@ -220,7 +227,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
220227
var disk *ecs.Disk
221228
describeDisk := func() (*csi.DeleteVolumeResponse, error) {
222229
var err error
223-
disk, err = findDiskByID(req.VolumeId, cs.ecs)
230+
disk, err = cs.cd.batcher.Describe(ctx, req.VolumeId)
224231
if err != nil {
225232
return nil, status.Errorf(codes.Internal, "DeleteVolume: find disk(%s) by id with error: %v", req.VolumeId, err)
226233
}
@@ -274,7 +281,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
274281
}
275282
}
276283

277-
_, err := deleteDisk(ctx, cs.ecs, req.VolumeId)
284+
_, err := cs.cd.deleteDisk(ctx, cs.ecs, req.VolumeId)
278285
if err != nil {
279286
newErrMsg := utils.FindSuggestionByErrorMessage(err.Error(), utils.DiskDelete)
280287
return nil, status.Errorf(codes.Internal, "DeleteVolume: Delete disk with error: %s", newErrMsg)

pkg/disk/desc/describe.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
77
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
8+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
89
)
910

1011
const batchSize = 100
@@ -29,17 +30,21 @@ func encodeIDs(ids []string) string {
2930
return string(res)
3031
}
3132

32-
type Disk struct {
33-
*ecs.Client
33+
type disk struct {
34+
cloud.ECSInterface
35+
}
36+
37+
func Disk(client cloud.ECSInterface) Client[ecs.Disk] {
38+
return disk{client}
3439
}
3540

36-
func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) {
41+
func (c disk) Describe(ids []string) (Response[ecs.Disk], error) {
3742
req := ecs.CreateDescribeDisksRequest()
3843
req.DiskIds = encodeIDs(ids)
3944
req.PageSize = requests.NewInteger(batchSize)
4045

4146
ret := Response[ecs.Disk]{}
42-
resp, err := c.Client.DescribeDisks(req)
47+
resp, err := c.DescribeDisks(req)
4348
if err != nil {
4449
return ret, err
4550
}
@@ -48,15 +53,15 @@ func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) {
4853
return ret, nil
4954
}
5055

51-
func (c Disk) GetID(resource *ecs.Disk) string {
56+
func (c disk) GetID(resource *ecs.Disk) string {
5257
return resource.DiskId
5358
}
5459

55-
func (c Disk) Type() string {
60+
func (c disk) Type() string {
5661
return "disk"
5762
}
5863

59-
func (c Disk) BatchSize() int {
64+
func (c disk) BatchSize() int {
6065
return batchSize
6166
}
6267

pkg/disk/disk.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func GlobalConfigSet(m metadata.MetadataProvider) utils.Config {
243243
}
244244

245245
func newBatcher(fromNode bool) (waitstatus.StatusWaiter[ecs.Disk], batcher.Batcher[ecs.Disk]) {
246-
client := desc.Disk{Client: GlobalConfigVar.EcsClient}
246+
client := desc.Disk(GlobalConfigVar.EcsClient)
247247
ctx := context.Background()
248248
interval := 1 * time.Second
249249
if fromNode {

0 commit comments

Comments
 (0)