Skip to content

Commit 0f89d9c

Browse files
committed
batcher: optimize log for inefficient batching
Sometimes we are actually slowing down the system with batching. Optimize logs to diagnose this.
1 parent 28a062a commit 0f89d9c

File tree

2 files changed

+57
-22
lines changed

2 files changed

+57
-22
lines changed

pkg/disk/batcher/low_latency.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq
4949
ecsClient: ecsClient,
5050
requestChan: make(chan *getRequest[*T]),
5151
tokens: tokenBucket{
52-
limit: burst, perToken: perRequest,
52+
limit: time.Duration(burst) * perRequest, perToken: perRequest,
5353
},
5454
clk: clk,
5555
}
@@ -73,28 +73,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) {
7373
type tokenBucket struct {
7474
zeroAt time.Time
7575

76-
limit int
77-
perToken time.Duration
76+
limit time.Duration // How long it takes to fully refill the bucket
77+
perToken time.Duration // How long it takes to refill one token
7878
}
7979

8080
func (tb *tokenBucket) tokenAt(t time.Time) float64 {
81-
elapsed := t.Sub(tb.zeroAt)
82-
token := float64(elapsed) / float64(tb.perToken)
83-
if token > float64(tb.limit) {
84-
token = float64(tb.limit)
85-
}
86-
return token
81+
elapsed := min(t.Sub(tb.zeroAt), tb.limit)
82+
return float64(elapsed) / float64(tb.perToken)
8783
}
8884

8985
func (tb *tokenBucket) takeAt(t time.Time) {
9086
elapsed := t.Sub(tb.zeroAt)
91-
if elapsed >= time.Duration(tb.limit)*tb.perToken {
92-
tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken)
93-
} else {
94-
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
95-
if tb.zeroAt.After(t) {
96-
tb.zeroAt = t
97-
}
87+
if elapsed >= tb.limit {
88+
tb.zeroAt = t.Add(-tb.limit)
89+
}
90+
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
91+
if tb.zeroAt.After(t) {
92+
tb.zeroAt = t
9893
}
9994
}
10095

@@ -111,6 +106,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
111106
timeout = nil
112107
}
113108

109+
var d time.Duration
110+
nInefficient := 0
114111
for {
115112
select {
116113
case <-ctx.Done():
@@ -123,17 +120,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
123120
requests[r.id] = append(requests[r.id], r)
124121
if len(requests) == batchSize {
125122
logger.V(4).Info("batch full", "n", batchSize)
123+
nInefficient = 0
126124
descBatch(t)
127125
} else if timeout == nil {
128126
// add some artificial delay for better batching
129127
// the less tokens left, the more we wait
130128
tokens := w.tokens.tokenAt(t)
131-
d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
129+
d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
132130
timeout = w.clk.After(d)
133131
logger.V(4).Info("batch waiting", "timeout", d)
134132
}
135133
case t := <-timeout:
136-
logger.V(4).Info("batch timeout", "n", len(requests))
134+
v := 4
135+
if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput
136+
v = 2
137+
if len(requests) <= 1 {
138+
// We have waited, but didn't get the second request.
139+
// We increased the latency with no benefit :(
140+
nInefficient++
141+
v = 1
142+
if nInefficient == 3 {
143+
logger.V(1).Info("Inefficient batching, please increase upstream concurrency")
144+
}
145+
}
146+
}
147+
if v > 1 {
148+
nInefficient = 0
149+
}
150+
logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests))
137151
descBatch(t)
138152
}
139153
}

pkg/disk/batcher/low_latency_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,22 @@ import (
1212
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1313
testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing"
1414
"github.com/stretchr/testify/assert"
15+
"k8s.io/klog/v2"
16+
"k8s.io/klog/v2/ktesting"
1517
"k8s.io/utils/clock"
1618
testclock "k8s.io/utils/clock/testing"
1719
)
1820

1921
func TestZeroBucketIsFilled(t *testing.T) {
2022
bucket := tokenBucket{
21-
limit: 8, perToken: 2 * time.Second,
23+
limit: 16 * time.Second, perToken: 2 * time.Second,
2224
}
2325
assert.Equal(t, 8., bucket.tokenAt(time.Now()))
2426
}
2527

2628
func TestBucketTake(t *testing.T) {
2729
bucket := tokenBucket{
28-
limit: 8, perToken: 2 * time.Second,
30+
limit: 16 * time.Second, perToken: 2 * time.Second,
2931
}
3032
now := time.Now()
3133
bucket.takeAt(now)
@@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock,
4951
clk = testclock.NewFakeClock(time.Now())
5052
batcher = NewLowLatency(client, clk, 2*time.Second, 8)
5153

52-
ctx, cancel := context.WithCancel(context.Background())
53-
t.Cleanup(cancel)
54+
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
55+
ctx := klog.NewContext(t.Context(), logger)
5456
go batcher.Run(ctx)
5557

5658
return client, clk, batcher
@@ -153,3 +155,22 @@ func TestFailures(t *testing.T) {
153155
assert.ErrorIs(t, err, ErrFake)
154156
assert.Nil(t, disk)
155157
}
158+
159+
func TestReportInefficient(t *testing.T) {
160+
client := &testdesc.FakeClient{}
161+
batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8)
162+
163+
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
164+
ctx := klog.NewContext(t.Context(), logger)
165+
go batcher.Run(ctx)
166+
167+
client.Disks.Store("d-test", &ecs.Disk{
168+
DiskId: "d-test",
169+
DiskName: "mydisk",
170+
})
171+
172+
for range 20 {
173+
batcher.Describe(ctx, "d-test")
174+
}
175+
assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching")
176+
}

0 commit comments

Comments
 (0)