Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/cloud/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,14 @@ func (t *Throttler) Throttle(ctx context.Context, f func() error) error {
}
}
}

func Throttled[TReq any, TResp any](t *Throttler, f func(TReq) (TResp, error)) func(context.Context, TReq) (TResp, error) {
return func(ctx context.Context, req TReq) (TResp, error) {
var resp TResp
err := t.Throttle(ctx, func() (err error) {
resp, err = f(req)
return err
})
return resp, err
}
}
50 changes: 32 additions & 18 deletions pkg/disk/batcher/low_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq
ecsClient: ecsClient,
requestChan: make(chan *getRequest[*T]),
tokens: tokenBucket{
limit: burst, perToken: perRequest,
limit: time.Duration(burst) * perRequest, perToken: perRequest,
},
clk: clk,
}
Expand All @@ -71,28 +71,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) {
type tokenBucket struct {
zeroAt time.Time

limit int
perToken time.Duration
limit time.Duration // How long it takes to fully refill the bucket
perToken time.Duration // How long it takes to refill one token
}

func (tb *tokenBucket) tokenAt(t time.Time) float64 {
elapsed := t.Sub(tb.zeroAt)
token := float64(elapsed) / float64(tb.perToken)
if token > float64(tb.limit) {
token = float64(tb.limit)
}
return token
elapsed := min(t.Sub(tb.zeroAt), tb.limit)
return float64(elapsed) / float64(tb.perToken)
}

func (tb *tokenBucket) takeAt(t time.Time) {
elapsed := t.Sub(tb.zeroAt)
if elapsed >= time.Duration(tb.limit)*tb.perToken {
tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken)
} else {
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
if tb.zeroAt.After(t) {
tb.zeroAt = t
}
if elapsed >= tb.limit {
tb.zeroAt = t.Add(-tb.limit)
}
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
if tb.zeroAt.After(t) {
tb.zeroAt = t
}
}

Expand All @@ -109,6 +104,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
timeout = nil
}

var d time.Duration
nInefficient := 0
for {
select {
case <-ctx.Done():
Expand All @@ -121,17 +118,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
requests[r.id] = append(requests[r.id], r)
if len(requests) == batchSize {
logger.V(4).Info("batch full", "n", batchSize)
nInefficient = 0
descBatch(t)
} else if timeout == nil {
// add some artificial delay for better batching
// the less tokens left, the more we wait
tokens := w.tokens.tokenAt(t)
d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
timeout = w.clk.After(d)
logger.V(4).Info("batch waiting", "timeout", d)
}
case t := <-timeout:
logger.V(4).Info("batch timeout", "n", len(requests))
v := 4
if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput
v = 2
if len(requests) <= 1 {
// We have waited, but didn't get the second request.
// We increased the latency with no benefit :(
nInefficient++
v = 1
if nInefficient == 3 {
logger.V(1).Info("Inefficient batching, please increase upstream concurrency")
}
}
}
if v > 1 {
nInefficient = 0
}
logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests))
descBatch(t)
}
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/disk/batcher/low_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
testclock "k8s.io/utils/clock/testing"
)

func TestZeroBucketIsFilled(t *testing.T) {
bucket := tokenBucket{
limit: 8, perToken: 2 * time.Second,
limit: 16 * time.Second, perToken: 2 * time.Second,
}
assert.Equal(t, 8., bucket.tokenAt(time.Now()))
}

func TestBucketTake(t *testing.T) {
bucket := tokenBucket{
limit: 8, perToken: 2 * time.Second,
limit: 16 * time.Second, perToken: 2 * time.Second,
}
now := time.Now()
bucket.takeAt(now)
Expand All @@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock,
clk = testclock.NewFakeClock(time.Now())
batcher = NewLowLatency(client, clk, 2*time.Second, 8)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
ctx := klog.NewContext(t.Context(), logger)
go batcher.Run(ctx)

return client, clk, batcher
Expand Down Expand Up @@ -153,3 +155,22 @@ func TestFailures(t *testing.T) {
assert.ErrorIs(t, err, ErrFake)
assert.Nil(t, disk)
}

func TestReportInefficient(t *testing.T) {
client := &testdesc.FakeClient{}
batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8)

logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
ctx := klog.NewContext(t.Context(), logger)
go batcher.Run(ctx)

client.Disks.Store("d-test", &ecs.Disk{
DiskId: "d-test",
DiskName: "mydisk",
})

for range 20 {
batcher.Describe(ctx, "d-test")
}
assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching")
}
25 changes: 25 additions & 0 deletions pkg/disk/batcher/passthrough.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package batcher

import (
"context"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
)

type Passthrough[T any] struct {
client desc.Client[T]
}

func NewPassthrough[T any](client desc.Client[T]) Passthrough[T] {
return Passthrough[T]{
client: client,
}
}

func (w Passthrough[T]) Describe(ctx context.Context, id string) (*T, error) {
resp, err := w.client.Describe([]string{id})
if err != nil {
return nil, err
}
return &resp.Resources[0], nil
}
Loading