Skip to content

Commit a22b6b6

Browse files
authored
Merge pull request #1218 from huww98/disk-adslot-message
disk/ad_slot: return better GRPC error message
2 parents 5ca2548 + dbf83c8 commit a22b6b6

File tree

6 files changed

+105
-7
lines changed

6 files changed

+105
-7
lines changed

pkg/common/interceptors.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,37 @@ func instrumentGRPC(driverType string) grpc.UnaryServerInterceptor {
3939
}
4040

4141
func recordExecTime(time time.Duration, method, driverType string, err error) {
42+
// copied from google.golang.org/grpc/server.go
43+
appStatus, ok := status.FromError(err)
44+
if !ok {
45+
// Convert non-status application error to a status error with code
46+
// Unknown, but handle context errors specifically.
47+
appStatus = status.FromContextError(err)
48+
}
49+
4250
labels := prometheus.Labels{
4351
metric.CsiGrpcExecTimeLabelMethod: method,
4452
metric.CsiGrpcExecTimeLabelType: driverType,
45-
metric.CsiGrpcExecTimeLabelCode: status.Code(err).String(),
53+
metric.CsiGrpcExecTimeLabelCode: appStatus.Code().String(),
4654
}
4755
metric.CsiGrpcExecTimeCollector.ExecCountMetric.With(labels).Inc()
4856
metric.CsiGrpcExecTimeCollector.ExecTimeTotalMetric.With(labels).Add(time.Seconds())
4957
}
58+
59+
// Timeout the request a little bit earlier, to get the error message out.
60+
// reduce the timeout by 1s or 10%, whichever is smaller.
61+
func earlyTimeout(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
62+
deadline, ok := ctx.Deadline()
63+
if !ok {
64+
return handler(ctx, req)
65+
}
66+
timeout := time.Until(deadline)
67+
if time.Second < timeout/10 {
68+
deadline = deadline.Add(-time.Second)
69+
} else {
70+
deadline = deadline.Add(-timeout / 10)
71+
}
72+
ctx, cancel := context.WithDeadline(ctx, deadline)
73+
defer cancel()
74+
return handler(ctx, req)
75+
}

pkg/common/interceptors_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package common
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestEarlyTimeout_None(t *testing.T) {
12+
handler := func(ctx context.Context, _ interface{}) (interface{}, error) {
13+
_, ok := ctx.Deadline()
14+
assert.False(t, ok)
15+
return struct{}{}, nil
16+
}
17+
earlyTimeout(context.Background(), nil, nil, handler)
18+
}
19+
20+
func TestEarlyTimeout(t *testing.T) {
21+
cases := []struct {
22+
before, after time.Duration
23+
}{
24+
{60 * time.Second, 59 * time.Second},
25+
{1 * time.Second, 900 * time.Millisecond},
26+
}
27+
for _, tc := range cases {
28+
t.Run(tc.before.String(), func(t *testing.T) {
29+
now := time.Now()
30+
handler := func(ctx context.Context, _ interface{}) (interface{}, error) {
31+
deadline, ok := ctx.Deadline()
32+
assert.True(t, ok)
33+
assert.WithinDuration(t, now.Add(tc.after), deadline, 10*time.Millisecond)
34+
return struct{}{}, nil
35+
}
36+
ctx, cancel := context.WithDeadline(context.Background(), now.Add(tc.before))
37+
t.Cleanup(cancel)
38+
earlyTimeout(ctx, nil, nil, handler)
39+
})
40+
}
41+
42+
}

pkg/common/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func RunCSIServer(driverType, endpoint string, servers Servers) {
5454
}
5555

5656
opts := []grpc.ServerOption{
57-
grpc.ChainUnaryInterceptor(logGRPC, instrumentGRPC(driverType)),
57+
grpc.ChainUnaryInterceptor(logGRPC, instrumentGRPC(driverType), earlyTimeout),
5858
}
5959
server := grpc.NewServer(opts...)
6060

pkg/disk/attachdetach_slot.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package disk
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67
)
78

@@ -73,7 +74,7 @@ func (s serialAD_DetachSlot) Acquire(ctx context.Context) error {
7374
case s.slot <- struct{}{}:
7475
return nil
7576
case <-ctx.Done():
76-
return ctx.Err()
77+
return maybeWaitingAD(ctx.Err())
7778
}
7879
}
7980

@@ -85,7 +86,7 @@ func (s serialAD_AttachSlot) Acquire(ctx context.Context) error {
8586
case s.slot <- struct{}{}:
8687
return nil
8788
case <-ctx.Done():
88-
return ctx.Err()
89+
return maybeWaitingAD(ctx.Err())
8990
}
9091
}
9192

@@ -120,7 +121,7 @@ func (s maxConcurrentSlot) Acquire(ctx context.Context) error {
120121
case s.slots <- struct{}{}:
121122
return nil
122123
case <-ctx.Done():
123-
return ctx.Err()
124+
return maybeWaitingAD(ctx.Err())
124125
}
125126
}
126127

@@ -181,3 +182,20 @@ func NewSlots(detachConcurrency, attachConcurrency int) AttachDetachSlots {
181182
}
182183
return NewPerNodeSlots(makeSlot)
183184
}
185+
186+
type waitingAD struct{}
187+
188+
func (waitingAD) Error() string {
189+
return "still waiting for other disk(s) to finish attach/detach"
190+
}
191+
192+
func (waitingAD) Is(target error) bool {
193+
return target == context.DeadlineExceeded
194+
}
195+
196+
func maybeWaitingAD(err error) error {
197+
if errors.Is(err, context.DeadlineExceeded) {
198+
return waitingAD{}
199+
}
200+
return err
201+
}

pkg/disk/attachdetach_slot_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,15 @@ func TestSerialDetach_NoRace(t *testing.T) {
171171
t.Fatal("state not updated")
172172
}
173173
}
174+
175+
func TestWaitingADError(t *testing.T) {
176+
s := NewSlots(1, 0).GetSlotFor("node1").Detach()
177+
ctx := context.Background()
178+
assert.NoError(t, s.Acquire(ctx))
179+
180+
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
181+
defer cancel()
182+
err := s.Acquire(ctx)
183+
assert.ErrorIs(t, err, waitingAD{})
184+
assert.ErrorIs(t, err, context.DeadlineExceeded)
185+
}

pkg/disk/cloud.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
179179

180180
slot := ad.slots.GetSlotFor(nodeID).Attach()
181181
if err := slot.Acquire(ctx); err != nil {
182-
return "", status.Errorf(codes.Aborted, "AttachDisk: get ad-slot for disk %s failed: %v", diskID, err)
182+
return "", fmt.Errorf("failed to reserve node %s for attach: %w", nodeID, err)
183183
}
184184
defer slot.Release()
185185

@@ -443,7 +443,7 @@ func (ad *DiskAttachDetach) detachDisk(ctx context.Context, ecsClient cloud.ECSI
443443
// NodeStageVolume/NodeUnstageVolume should be called by sequence
444444
slot := ad.slots.GetSlotFor(nodeID).Detach()
445445
if err := slot.Acquire(ctx); err != nil {
446-
return status.Errorf(codes.Aborted, "DetachDisk: get ad-slot for disk %s failed: %v", diskID, err)
446+
return fmt.Errorf("failed to reserve node %s for detach: %w", nodeID, err)
447447
}
448448
defer slot.Release()
449449

0 commit comments

Comments
 (0)