Skip to content

Commit 3a64b2d

Browse files
committed
batcher: optimize log for inefficient batching
Sometimes we are actually slowing down the system with batching. Optimize logs to diagnose this.
1 parent 6c21dcf commit 3a64b2d

File tree

2 files changed

+57
-22
lines changed

2 files changed

+57
-22
lines changed

pkg/disk/batcher/low_latency.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewLowLatency[T any](ecsClient desc.Client[T], clk clock.WithTicker, perReq
4747
ecsClient: ecsClient,
4848
requestChan: make(chan *getRequest[*T]),
4949
tokens: tokenBucket{
50-
limit: burst, perToken: perRequest,
50+
limit: time.Duration(burst) * perRequest, perToken: perRequest,
5151
},
5252
clk: clk,
5353
}
@@ -71,28 +71,23 @@ func (w *LowLatency[T]) Describe(ctx context.Context, id string) (*T, error) {
7171
type tokenBucket struct {
7272
zeroAt time.Time
7373

74-
limit int
75-
perToken time.Duration
74+
limit time.Duration // How long it takes to fully refill the bucket
75+
perToken time.Duration // How long it takes to refill one token
7676
}
7777

7878
func (tb *tokenBucket) tokenAt(t time.Time) float64 {
79-
elapsed := t.Sub(tb.zeroAt)
80-
token := float64(elapsed) / float64(tb.perToken)
81-
if token > float64(tb.limit) {
82-
token = float64(tb.limit)
83-
}
84-
return token
79+
elapsed := min(t.Sub(tb.zeroAt), tb.limit)
80+
return float64(elapsed) / float64(tb.perToken)
8581
}
8682

8783
func (tb *tokenBucket) takeAt(t time.Time) {
8884
elapsed := t.Sub(tb.zeroAt)
89-
if elapsed >= time.Duration(tb.limit)*tb.perToken {
90-
tb.zeroAt = t.Add(-time.Duration(tb.limit-1) * tb.perToken)
91-
} else {
92-
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
93-
if tb.zeroAt.After(t) {
94-
tb.zeroAt = t
95-
}
85+
if elapsed >= tb.limit {
86+
tb.zeroAt = t.Add(-tb.limit)
87+
}
88+
tb.zeroAt = tb.zeroAt.Add(tb.perToken)
89+
if tb.zeroAt.After(t) {
90+
tb.zeroAt = t
9691
}
9792
}
9893

@@ -109,6 +104,8 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
109104
timeout = nil
110105
}
111106

107+
var d time.Duration
108+
nInefficient := 0
112109
for {
113110
select {
114111
case <-ctx.Done():
@@ -121,17 +118,34 @@ func (w *LowLatency[T]) Run(ctx context.Context) {
121118
requests[r.id] = append(requests[r.id], r)
122119
if len(requests) == batchSize {
123120
logger.V(4).Info("batch full", "n", batchSize)
121+
nInefficient = 0
124122
descBatch(t)
125123
} else if timeout == nil {
126124
// add some artificial delay for better batching
127125
// the less tokens left, the more we wait
128126
tokens := w.tokens.tokenAt(t)
129-
d := time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
127+
d = time.Duration(math.Pow(0.5, tokens) * float64(w.tokens.perToken))
130128
timeout = w.clk.After(d)
131129
logger.V(4).Info("batch waiting", "timeout", d)
132130
}
133131
case t := <-timeout:
134-
logger.V(4).Info("batch timeout", "n", len(requests))
132+
v := 4
133+
if d > w.tokens.perToken/2 { // We are becoming the bottleneck of system throughput
134+
v = 2
135+
if len(requests) <= 1 {
136+
// We have waited, but didn't get the second request.
137+
// We increased the latency with no benefit :(
138+
nInefficient++
139+
v = 1
140+
if nInefficient == 3 {
141+
logger.V(1).Info("Inefficient batching, please increase upstream concurrency")
142+
}
143+
}
144+
}
145+
if v > 1 {
146+
nInefficient = 0
147+
}
148+
logger.V(v).Info("batch timeout", "timeout", d, "n", len(requests))
135149
descBatch(t)
136150
}
137151
}

pkg/disk/batcher/low_latency_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,22 @@ import (
1212
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
1313
testdesc "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc/testing"
1414
"github.com/stretchr/testify/assert"
15+
"k8s.io/klog/v2"
16+
"k8s.io/klog/v2/ktesting"
1517
"k8s.io/utils/clock"
1618
testclock "k8s.io/utils/clock/testing"
1719
)
1820

1921
func TestZeroBucketIsFilled(t *testing.T) {
2022
bucket := tokenBucket{
21-
limit: 8, perToken: 2 * time.Second,
23+
limit: 16 * time.Second, perToken: 2 * time.Second,
2224
}
2325
assert.Equal(t, 8., bucket.tokenAt(time.Now()))
2426
}
2527

2628
func TestBucketTake(t *testing.T) {
2729
bucket := tokenBucket{
28-
limit: 8, perToken: 2 * time.Second,
30+
limit: 16 * time.Second, perToken: 2 * time.Second,
2931
}
3032
now := time.Now()
3133
bucket.takeAt(now)
@@ -49,8 +51,8 @@ func setup(t *testing.T) (client *testdesc.FakeClient, clk *testclock.FakeClock,
4951
clk = testclock.NewFakeClock(time.Now())
5052
batcher = NewLowLatency(client, clk, 2*time.Second, 8)
5153

52-
ctx, cancel := context.WithCancel(context.Background())
53-
t.Cleanup(cancel)
54+
logger := ktesting.NewLogger(t, ktesting.DefaultConfig)
55+
ctx := klog.NewContext(t.Context(), logger)
5456
go batcher.Run(ctx)
5557

5658
return client, clk, batcher
@@ -153,3 +155,22 @@ func TestFailures(t *testing.T) {
153155
assert.ErrorIs(t, err, ErrFake)
154156
assert.Nil(t, disk)
155157
}
158+
159+
func TestReportInefficient(t *testing.T) {
160+
client := &testdesc.FakeClient{}
161+
batcher := NewLowLatency(client, clock.RealClock{}, 20*time.Millisecond, 8)
162+
163+
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
164+
ctx := klog.NewContext(t.Context(), logger)
165+
go batcher.Run(ctx)
166+
167+
client.Disks.Store("d-test", &ecs.Disk{
168+
DiskId: "d-test",
169+
DiskName: "mydisk",
170+
})
171+
172+
for range 20 {
173+
batcher.Describe(ctx, "d-test")
174+
}
175+
assert.Contains(t, logger.GetSink().(ktesting.Underlier).GetBuffer().String(), "Inefficient batching")
176+
}

0 commit comments

Comments
 (0)