From 6c21dcf9a616158091c5607ece4c16be98b38523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Wed, 23 Jul 2025 13:24:04 +0800 Subject: [PATCH 1/2] disk: optimize batch create/delete Integrating batcher and throttle. --- pkg/cloud/throttle/throttle.go | 11 +++++++ pkg/disk/cloud.go | 33 ++++++++++---------- pkg/disk/cloud_test.go | 57 ++++++++++++++++++++++------------ pkg/disk/controllerserver.go | 13 ++++++-- pkg/disk/desc/describe.go | 19 +++++++----- pkg/disk/disk.go | 2 +- 6 files changed, 87 insertions(+), 48 deletions(-) diff --git a/pkg/cloud/throttle/throttle.go b/pkg/cloud/throttle/throttle.go index 6af0acc12..da909e46e 100644 --- a/pkg/cloud/throttle/throttle.go +++ b/pkg/cloud/throttle/throttle.go @@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error { } } } + +func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) { + return func(ctx context.Context, req TReq) (TResp, error) { + var resp TResp + err := t.Throttle(ctx, func() (err error) { + resp, err = f(req) + return err + }) + return resp, err + } +} diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 0e90ea97d..37e417d87 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -77,6 +77,13 @@ type DiskAttachDetach struct { dev *DeviceManager } +type DiskCreateDelete struct { + ecs cloud.ECSInterface + batcher batcher.Batcher[ecs.Disk] + createThrottler *throttle.Throttler + deleteThrottler *throttle.Throttler +} + func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) { after, err := ad.dev.ListBlocks() if err != nil { @@ -290,11 +297,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin for key, value := range GlobalConfigVar.RequestBaseInfo { attachRequest.AppendUserAgent(key, value) } - var response *ecs.AttachDiskResponse - err = ad.attachThrottler.Throttle(ctx, func() error { - response, err = ecsClient.AttachDisk(attachRequest) - return err - }) + response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest) if err != nil { var aliErr *alicloudErr.ServerError if errors.As(err, &aliErr) { @@ -465,11 +468,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI for key, value := range GlobalConfigVar.RequestBaseInfo { detachDiskRequest.AppendUserAgent(key, value) } - var response *ecs.DetachDiskResponse - err = ad.detachThrottler.Throttle(ctx, func() error { - response, err = ecsClient.DetachDisk(detachDiskRequest) - return err - }) + response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest) if err != nil { return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err) } @@ -927,7 +926,7 @@ func isValidSnapshotName(name string) bool { return validDiskNameRegexp.MatchString(name) } -func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) { +func (c *DiskCreateDelete) createDisk(ctx context.Context, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) { // 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来 createDiskRequest := buildCreateDiskRequest(diskVol) if isValidDiskName(diskName) { @@ -962,7 +961,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV attempt.Instance = selectedInstance } retry: - diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient) + diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt) if err != nil { if final { return "", attempt, err @@ -972,7 +971,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV // protect us from infinite loop return "", attempt, status.Error(codes.Internal, "unexpected parameter mismatch") } - existingDisk, err := findDiskByName(diskName, ecsClient) + existingDisk, err := findDiskByName(diskName, c.ecs) if err != nil { return "", attempt, status.Errorf(codes.Internal, "parameter mismatch detected, but fetch existing disk failed: %v", err) } @@ -1051,11 +1050,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt var ErrParameterMismatch = errors.New("parameter mismatch") -func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) { +func (c *DiskCreateDelete) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt) (diskId string, final bool, err error) { req = finalizeCreateDiskRequest(req, attempt) klog.Infof("request: request content: %++v", req) - volumeRes, err := ecsClient.CreateDisk(req) + volumeRes, err := throttle.Throttled(c.createThrottler, c.ecs.CreateDisk)(ctx, req) if err == nil { klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId) return volumeRes.DiskId, true, nil @@ -1157,14 +1156,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag { return diskTags } -func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) { +func (c *DiskCreateDelete) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) { deleteDiskRequest := ecs.CreateDeleteDiskRequest() deleteDiskRequest.DiskId = diskId var resp *ecs.DeleteDiskResponse err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) { var err error - resp, err = ecsClient.DeleteDisk(deleteDiskRequest) + resp, err = throttle.Throttled(c.deleteThrottler, c.ecs.DeleteDisk)(ctx, deleteDiskRequest) if err == nil { klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId) return true, nil diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index c522ff540..b2de76091 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -4,14 +4,19 @@ import ( "context" "fmt" "testing" + "time" alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" gomock "github.com/golang/mock/gomock" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/clock" ) var ( @@ -31,37 +36,49 @@ func init() { }`), resizeDiskResponse) } -func TestDeleteDisk(t *testing.T) { +func testCreateDelete(t *testing.T) (*cloud.MockECSInterface, *DiskCreateDelete) { ctrl := gomock.NewController(t) c := cloud.NewMockECSInterface(ctrl) + return c, &DiskCreateDelete{ + ecs: c, + createThrottler: defaultThrottler(), + deleteThrottler: defaultThrottler(), + batcher: batcher.NewLowLatency(desc.Disk(c), clock.RealClock{}, 1*time.Second, 8), + } +} + +func TestDeleteDisk(t *testing.T) { + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) + c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Nil(t, err) } func TestDeleteDiskRetryOnInitError(t *testing.T) { t.Parallel() - ctrl := gomock.NewController(t) - c := cloud.NewMockECSInterface(ctrl) + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) initErr := alicloudErr.NewServerError(400, `{"Code": "IncorrectDiskStatus.Initializing"}`, "") c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, initErr) c.EXPECT().DeleteDisk(gomock.Any()).Return(deleteDiskResponse, nil) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Nil(t, err) } func TestDeleteDiskPassthroughError(t *testing.T) { - ctrl := gomock.NewController(t) - c := cloud.NewMockECSInterface(ctrl) + c, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) serverErr := alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "") c.EXPECT().DeleteDisk(gomock.Any()).Return(nil, serverErr) - _, err := deleteDisk(context.Background(), c, "test-disk") + _, err := cd.deleteDisk(ctx, c, "test-disk") assert.Equal(t, serverErr, err) } @@ -384,8 +401,8 @@ func TestCreateDisk_Basic(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) if !c.err { client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ @@ -396,7 +413,7 @@ func TestCreateDisk_Basic(t *testing.T) { client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")) } - diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", c.args, c.supports, c.instance, c.isVirtualNode) if c.err { assert.Error(t, err) } else { @@ -409,8 +426,8 @@ func TestCreateDisk_Basic(t *testing.T) { } func TestCreateDisk_ServerFailFallback(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, "")) client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ @@ -418,7 +435,7 @@ func TestCreateDisk_ServerFailFallback(t *testing.T) { }, nil) args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} - diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) assert.NoError(t, err) assert.Equal(t, "d-123", diskID) assert.Equal(t, DiskESSDAuto, attempt.Category) @@ -456,8 +473,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")) r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ @@ -472,7 +489,7 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} - diskID, attempt, err := createDisk(client, "disk-name", "", args, nil, "", false) + diskID, attempt, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) if c.err { assert.Error(t, err) } else { @@ -492,8 +509,8 @@ func TestCreateDisk_ParameterMismatch(t *testing.T) { } func TestCreateDisk_NoInfiniteLoop(t *testing.T) { - ctrl := gomock.NewController(t) - client := cloud.NewMockECSInterface(ctrl) + client, cd := testCreateDelete(t) + _, ctx := ktesting.NewTestContext(t) client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2) client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ @@ -503,7 +520,7 @@ func TestCreateDisk_NoInfiniteLoop(t *testing.T) { }, nil) args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20} - _, _, err := createDisk(client, "disk-name", "", args, nil, "", false) + _, _, err := cd.createDisk(ctx, "disk-name", "", args, nil, "", false) assert.Error(t, err) } diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index 5b18b7c5c..6f5edd204 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -52,6 +52,7 @@ import ( type controllerServer struct { recorder record.EventRecorder ad DiskAttachDetach + cd DiskCreateDelete meta metadata.MetadataProvider ecs cloud.ECSInterface snapshotWaiter waitstatus.StatusWaiter[ecs.Snapshot] @@ -104,6 +105,12 @@ func NewControllerServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata attachThrottler: defaultThrottler(), detachThrottler: defaultThrottler(), }, + cd: DiskCreateDelete{ + ecs: ecs, + batcher: batcher, + createThrottler: defaultThrottler(), + deleteThrottler: defaultThrottler(), + }, snapshotWaiter: newSnapshotStatusWaiter(), } detachConcurrency := 1 @@ -177,7 +184,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol isVirtualNode = node.Labels[common.NodeTypeLabelKey] == common.VirtualNodeType } - diskID, attempt, err := createDisk(cs.ecs, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode) + diskID, attempt, err := cs.cd.createDisk(ctx, req.GetName(), snapshotID, diskVol, supportedTypes, selectedInstance, isVirtualNode) if err != nil { if errors.Is(err, ErrParameterMismatch) { return nil, status.Errorf(codes.AlreadyExists, "volume %s already created but %v", req.Name, err) @@ -220,7 +227,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol var disk *ecs.Disk describeDisk := func() (*csi.DeleteVolumeResponse, error) { var err error - disk, err = findDiskByID(req.VolumeId, cs.ecs) + disk, err = cs.cd.batcher.Describe(ctx, req.VolumeId) if err != nil { return nil, status.Errorf(codes.Internal, "DeleteVolume: find disk(%s) by id with error: %v", req.VolumeId, err) } @@ -274,7 +281,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } } - _, err := deleteDisk(ctx, cs.ecs, req.VolumeId) + _, err := cs.cd.deleteDisk(ctx, cs.ecs, req.VolumeId) if err != nil { newErrMsg := utils.FindSuggestionByErrorMessage(err.Error(), utils.DiskDelete) return nil, status.Errorf(codes.Internal, "DeleteVolume: Delete disk with error: %s", newErrMsg) diff --git a/pkg/disk/desc/describe.go b/pkg/disk/desc/describe.go index 488868db3..aab713f1b 100644 --- a/pkg/disk/desc/describe.go +++ b/pkg/disk/desc/describe.go @@ -5,6 +5,7 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" ) const batchSize = 100 @@ -29,17 +30,21 @@ func encodeIDs(ids []string) string { return string(res) } -type Disk struct { - *ecs.Client +type disk struct { + cloud.ECSInterface +} + +func Disk(client cloud.ECSInterface) Client[ecs.Disk] { + return disk{client} } -func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) { +func (c disk) Describe(ids []string) (Response[ecs.Disk], error) { req := ecs.CreateDescribeDisksRequest() req.DiskIds = encodeIDs(ids) req.PageSize = requests.NewInteger(batchSize) ret := Response[ecs.Disk]{} - resp, err := c.Client.DescribeDisks(req) + resp, err := c.DescribeDisks(req) if err != nil { return ret, err } @@ -48,15 +53,15 @@ func (c Disk) Describe(ids []string) (Response[ecs.Disk], error) { return ret, nil } -func (c Disk) GetID(resource *ecs.Disk) string { +func (c disk) GetID(resource *ecs.Disk) string { return resource.DiskId } -func (c Disk) Type() string { +func (c disk) Type() string { return "disk" } -func (c Disk) BatchSize() int { +func (c disk) BatchSize() int { return batchSize } diff --git a/pkg/disk/disk.go b/pkg/disk/disk.go index c0d82351e..f4eba8557 100644 --- a/pkg/disk/disk.go +++ b/pkg/disk/disk.go @@ -243,7 +243,7 @@ func GlobalConfigSet(m metadata.MetadataProvider) utils.Config { } func newBatcher(fromNode bool) (waitstatus.StatusWaiter[ecs.Disk], batcher.Batcher[ecs.Disk]) { - client := desc.Disk{Client: GlobalConfigVar.EcsClient} + client := desc.Disk(GlobalConfigVar.EcsClient) ctx := context.Background() interval := 1 * time.Second if fromNode { From 3a64b2d10083ff048beabbdeef817f54bea3307a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Thu, 24 Jul 2025 15:47:51 +0800 Subject: [PATCH 2/2] batcher: optimize log for inefficient batching Sometimes we are actually slowing down the system with batching. Optimize logs to diagnose this. --- pkg/disk/batcher/low_latency.go | 50 ++++++++++++++++++---------- pkg/disk/batcher/low_latency_test.go | 29 +++++++++++++--- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/pkg/disk/batcher/low_latency.go b/pkg/disk/batcher/low_latency.go index 698a3a3c3..e3c9435e0 100644 --- a/pkg/disk/batcher/low_latency.go +++ b/pkg/disk/batcher/low_latency.go @@ -47,7 +47,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq ecsClient: ecsClient, requestChan: make(chan *getRequest[*T]), tokens: tokenBucket{ - limit: burst, perToken: perRequest, + limit: time.Duration(burst) * perRequest, perToken: perRequest, }, clk: clk, } @@ -71,28 +71,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) { type tokenBucket struct { zeroAt time.Time - limit int - perToken time.Duration + limit time.Duration // How long it takes to fully refill the bucket + perToken time.Duration // How long it takes to refill one token } func (tb *tokenBucket) tokenAt(t time.Time) float64 { - elapsed := t.Sub(tb.zeroAt) - token := float64(elapsed) / float64(tb.perToken) - if token > float64(tb.limit) { - token = float64(tb.limit) - } - return token + elapsed := min(t.Sub(tb.zeroAt), tb.limit) + return float64(elapsed) / float64(tb.perToken) } func (tb *tokenBucket) takeAt(t time.Time) { elapsed := t.Sub(tb.zeroAt) - if elapsed >= time.Duration(tb.limit)*tb.perToken { - tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken) - } else { - tb.zeroAt = tb.zeroAt.Add(tb.perToken) - if tb.zeroAt.After(t) { - tb.zeroAt = t - } + if elapsed >= tb.limit { + tb.zeroAt = t.Add(-tb.limit) + } + tb.zeroAt = tb.zeroAt.Add(tb.perToken) + if tb.zeroAt.After(t) { + tb.zeroAt = t } } @@ -109,6 +104,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) { timeout = nil } + var d time.Duration + nInefficient := 0 for { select { case <-ctx.Done(): @@ -121,17 +118,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) { requests[r.id] = append(requests[r.id], r) if len(requests) == batchSize { logger.V(4).Info("batch full", "n", batchSize) + nInefficient = 0 descBatch(t) } else if timeout == nil { // add some artificial delay for better batching // the less tokens left, the more we wait tokens := w.tokens.tokenAt(t) - d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken)) + d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken)) timeout = w.clk.After(d) logger.V(4).Info("batch waiting", "timeout", d) } case t := <-timeout: - logger.V(4).Info("batch timeout", "n", len(requests)) + v := 4 + if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput + v = 2 + if len(requests) <= 1 { + // We have waited, but didn't get the second request. + // We increased the latency with no benefit :( + nInefficient++ + v = 1 + if nInefficient == 3 { + logger.V(1).Info("Inefficient batching, please increase upstream concurrency") + } + } + } + if v > 1 { + nInefficient = 0 + } + logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests)) descBatch(t) } } diff --git a/pkg/disk/batcher/low_latency_test.go b/pkg/disk/batcher/low_latency_test.go index 607687569..b57810820 100644 --- a/pkg/disk/batcher/low_latency_test.go +++ b/pkg/disk/batcher/low_latency_test.go @@ -12,20 +12,22 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing" "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testclock "k8s.io/utils/clock/testing" ) func TestZeroBucketIsFilled(t *testing.T) { bucket := tokenBucket{ - limit: 8, perToken: 2 * time.Second, + limit: 16 * time.Second, perToken: 2 * time.Second, } assert.Equal(t, 8., bucket.tokenAt(time.Now())) } func TestBucketTake(t *testing.T) { bucket := tokenBucket{ - limit: 8, perToken: 2 * time.Second, + limit: 16 * time.Second, perToken: 2 * time.Second, } now := time.Now() bucket.takeAt(now) @@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock, clk = testclock.NewFakeClock(time.Now()) batcher = NewLowLatency(client, clk, 2*time.Second, 8) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + logger := ktesting.NewLogger(t, ktesting.DefaultConfig) + ctx := klog.NewContext(t.Context(), logger) go batcher.Run(ctx) return client, clk, batcher @@ -153,3 +155,22 @@ func TestFailures(t *testing.T) { assert.ErrorIs(t, err, ErrFake) assert.Nil(t, disk) } + +func TestReportInefficient(t *testing.T) { + client := &testdesc.FakeClient{} + batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8) + + logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) + ctx := klog.NewContext(t.Context(), logger) + go batcher.Run(ctx) + + client.Disks.Store("d-test", &ecs.Disk{ + DiskId: "d-test", + DiskName: "mydisk", + }) + + for range 20 { + batcher.Describe(ctx, "d-test") + } + assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching") +}