Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/cloud/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
}
}
}

func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
return func(ctx context.Context, req TReq) (TResp, error) {
var resp TResp
err := t.Throttle(ctx, func() (err error) {
resp, err = f(req)
return err
})
return resp, err
}
}
50 changes: 32 additions & 18 deletions pkg/disk/batcher/low_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq
ecsClient: ecsClient,
requestChan: make(chan *getRequest[*T]),
tokens: tokenBucket{
limit: burst, perToken: perRequest,
limit: time.Duration(burst) * perRequest, perToken: perRequest,
},
clk: clk,
}
Expand All @@ -71,28 +71,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) {
type tokenBucket struct {
zeroAt time.Time

limit int
perToken time.Duration
limit time.Duration // How long it takes to fully refill the bucket
perToken time.Duration // How long it takes to refill one token
}

func (tb *tokenBucket) tokenAt(t time.Time) float64 {
elapsed := t.Sub(tb.zeroAt)
token := float64(elapsed) / float64(tb.perToken)
if token > float64(tb.limit) {
token = float64(tb.limit)
}
return token
elapsed := min(t.Sub(tb.zeroAt), tb.limit)
return float64(elapsed) / float64(tb.perToken)
}

func (tb *tokenBucket) takeAt(t time.Time) {
elapsed := t.Sub(tb.zeroAt)
if elapsed >= time.Duration(tb.limit)*tb.perToken {
tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken)
} else {
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
if tb.zeroAt.After(t) {
tb.zeroAt = t
}
if elapsed >= tb.limit {
tb.zeroAt = t.Add(-tb.limit)
}
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
if tb.zeroAt.After(t) {
tb.zeroAt = t
}
}

Expand All @@ -109,6 +104,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
timeout = nil
}

var d time.Duration
nInefficient := 0
for {
select {
case <-ctx.Done():
Expand All @@ -121,17 +118,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
requests[r.id] = append(requests[r.id], r)
if len(requests) == batchSize {
logger.V(4).Info("batch full", "n", batchSize)
nInefficient = 0
descBatch(t)
} else if timeout == nil {
// add some artificial delay for better batching
// the less tokens left, the more we wait
tokens := w.tokens.tokenAt(t)
d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
timeout = w.clk.After(d)
logger.V(4).Info("batch waiting", "timeout", d)
}
case t := <-timeout:
logger.V(4).Info("batch timeout", "n", len(requests))
v := 4
if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput
v = 2
if len(requests) <= 1 {
// We have waited, but didn't get the second request.
// We increased the latency with no benefit :(
nInefficient++
v = 1
if nInefficient == 3 {
logger.V(1).Info("Inefficient batching, please increase upstream concurrency")
}
}
}
if v > 1 {
nInefficient = 0
}
logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests))
descBatch(t)
}
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/disk/batcher/low_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testclock "k8s.io/utils/clock/testing"
)

func TestZeroBucketIsFilled(t *testing.T) {
bucket := tokenBucket{
limit: 8, perToken: 2 * time.Second,
limit: 16 * time.Second, perToken: 2 * time.Second,
}
assert.Equal(t, 8., bucket.tokenAt(time.Now()))
}

func TestBucketTake(t *testing.T) {
bucket := tokenBucket{
limit: 8, perToken: 2 * time.Second,
limit: 16 * time.Second, perToken: 2 * time.Second,
}
now := time.Now()
bucket.takeAt(now)
Expand All @@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock,
clk = testclock.NewFakeClock(time.Now())
batcher = NewLowLatency(client, clk, 2*time.Second, 8)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
ctx := klog.NewContext(t.Context(), logger)
go batcher.Run(ctx)

return client, clk, batcher
Expand Down Expand Up @@ -153,3 +155,22 @@ func TestFailures(t *testing.T) {
assert.ErrorIs(t, err, ErrFake)
assert.Nil(t, disk)
}

func TestReportInefficient(t *testing.T) {
client := &testdesc.FakeClient{}
batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8)

logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
ctx := klog.NewContext(t.Context(), logger)
go batcher.Run(ctx)

client.Disks.Store("d-test", &ecs.Disk{
DiskId: "d-test",
DiskName: "mydisk",
})

for range 20 {
batcher.Describe(ctx, "d-test")
}
assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching")
}
33 changes: 16 additions & 17 deletions pkg/disk/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type DiskAttachDetach struct {
dev *DeviceManager
}

type DiskCreateDelete struct {
ecs cloud.ECSInterface
batcher batcher.Batcher[ecs.Disk]
createThrottler *throttle.Throttler
deleteThrottler *throttle.Throttler
}

func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
after, err := ad.dev.ListBlocks()
if err != nil {
Expand Down Expand Up @@ -290,11 +297,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
for key, value := range GlobalConfigVar.RequestBaseInfo {
attachRequest.AppendUserAgent(key, value)
}
var response *ecs.AttachDiskResponse
err = ad.attachThrottler.Throttle(ctx, func() error {
response, err = ecsClient.AttachDisk(attachRequest)
return err
})
response, err := throttle.Throttled(ad.attachThrottler, ecsClient.AttachDisk)(ctx, attachRequest)
if err != nil {
var aliErr *alicloudErr.ServerError
if errors.As(err, &aliErr) {
Expand Down Expand Up @@ -465,11 +468,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI
for key, value := range GlobalConfigVar.RequestBaseInfo {
detachDiskRequest.AppendUserAgent(key, value)
}
var response *ecs.DetachDiskResponse
err = ad.detachThrottler.Throttle(ctx, func() error {
response, err = ecsClient.DetachDisk(detachDiskRequest)
return err
})
response, err := throttle.Throttled(ad.detachThrottler, ecsClient.DetachDisk)(ctx, detachDiskRequest)
if err != nil {
return status.Errorf(codes.Aborted, "DetachDisk: Fail to detach %s: from Instance: %s with error: %v", disk.DiskId, disk.InstanceId, err)
}
Expand Down Expand Up @@ -927,7 +926,7 @@ func isValidSnapshotName(name string) bool {
return validDiskNameRegexp.MatchString(name)
}

func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
func (c *DiskCreateDelete) createDisk(ctx context.Context, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category], selectedInstance string, isVirtualNode bool) (string, createAttempt, error) {
// 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来
createDiskRequest := buildCreateDiskRequest(diskVol)
if isValidDiskName(diskName) {
Expand Down Expand Up @@ -962,7 +961,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
attempt.Instance = selectedInstance
}
retry:
diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient)
diskID, final, err := c.createDiskAttempt(ctx, createDiskRequest, attempt)
if err != nil {
if final {
return "", attempt, err
Expand All @@ -972,7 +971,7 @@ func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskV
// protect us from infinite loop
return "", attempt, status.Error(codes.Internal, "unexpected parameter mismatch")
}
existingDisk, err := findDiskByName(diskName, ecsClient)
existingDisk, err := findDiskByName(diskName, c.ecs)
if err != nil {
return "", attempt, status.Errorf(codes.Internal, "parameter mismatch detected, but fetch existing disk failed: %v", err)
}
Expand Down Expand Up @@ -1051,11 +1050,11 @@ func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAt

var ErrParameterMismatch = errors.New("parameter mismatch")

func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) {
func (c *DiskCreateDelete) createDiskAttempt(ctx context.Context, req *ecs.CreateDiskRequest, attempt createAttempt) (diskId string, final bool, err error) {
req = finalizeCreateDiskRequest(req, attempt)
klog.Infof("request: request content: %++v", req)

volumeRes, err := ecsClient.CreateDisk(req)
volumeRes, err := throttle.Throttled(c.createThrottler, c.ecs.CreateDisk)(ctx, req)
if err == nil {
klog.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId)
return volumeRes.DiskId, true, nil
Expand Down Expand Up @@ -1157,14 +1156,14 @@ func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag {
return diskTags
}

func deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
func (c *DiskCreateDelete) deleteDisk(ctx context.Context, ecsClient cloud.ECSInterface, diskId string) (*ecs.DeleteDiskResponse, error) {
deleteDiskRequest := ecs.CreateDeleteDiskRequest()
deleteDiskRequest.DiskId = diskId

var resp *ecs.DeleteDiskResponse
err := wait.PollUntilContextTimeout(ctx, time.Second*5, DISK_DELETE_INIT_TIMEOUT, true, func(ctx context.Context) (bool, error) {
var err error
resp, err = ecsClient.DeleteDisk(deleteDiskRequest)
resp, err = throttle.Throttled(c.deleteThrottler, c.ecs.DeleteDisk)(ctx, deleteDiskRequest)
if err == nil {
klog.Infof("DeleteVolume: Successfully deleted volume: %s, with RequestId: %s", diskId, resp.RequestId)
return true, nil
Expand Down
Loading