Skip to content

Commit eb78c5e

Browse files
[azservicebus] Make it so retry sleeps can be cancelled. (Azure#19216)
Make it so retry sleeps can be cancelled. Without this the user will have to wait for the entire sleep before they can exit the function.
1 parent 9c7a2a8 commit eb78c5e

File tree

3 files changed

+79
-26
lines changed

3 files changed

+79
-26
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010

1111
- AcceptNextSessionForQueue and AcceptNextSessionForSubscription now return an azservicebus.Error with
1212
Code set to CodeTimeout when they fail due to no sessions being available. Examples for this have
13-
been added for `AcceptNextSessionForQueue`. PR#TBD.
13+
been added for `AcceptNextSessionForQueue`. PR#19113.
14+
- Retries now respect cancellation when they're in the "delay before next try" phase.
1415

1516
### Other Changes
1617

sdk/messaging/azservicebus/internal/utils/retrier.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ func Retry(ctx context.Context, eventName log.Event, operation string, fn func(c
5151
if i > 0 {
5252
sleep := calcDelay(ro, i)
5353
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", operation, i, sleep)
54-
time.Sleep(sleep)
54+
55+
select {
56+
case <-ctx.Done():
57+
return ctx.Err()
58+
case <-time.After(sleep):
59+
}
5560
}
5661

5762
args := RetryFnArgs{

sdk/messaging/azservicebus/internal/utils/retrier_test.go

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -86,30 +86,6 @@ func TestRetrier(t *testing.T) {
8686
require.EqualValues(t, 1, called)
8787
})
8888

89-
t.Run("Cancellation", func(t *testing.T) {
90-
ctx, cancel := context.WithCancel(context.Background())
91-
cancel()
92-
93-
isFatalFn := func(err error) bool {
94-
return errors.Is(err, context.Canceled)
95-
}
96-
97-
// it's up to
98-
err := Retry(ctx, testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error {
99-
// NOTE: it's up to the underlying function to handle cancellation. `Retry` doesn't
100-
// do anything but propagate it.
101-
select {
102-
case <-ctx.Done():
103-
default:
104-
require.Fail(t, "Context should have been cancelled")
105-
}
106-
107-
return context.Canceled
108-
}, isFatalFn, exported.RetryOptions{})
109-
110-
require.ErrorIs(t, context.Canceled, err)
111-
})
112-
11389
t.Run("ResetAttempts", func(t *testing.T) {
11490
isFatalFn := func(err error) bool {
11591
return errors.Is(err, context.Canceled)
@@ -165,6 +141,77 @@ func TestRetrier(t *testing.T) {
165141
})
166142
}
167143

144+
func TestCancellationCancelsSleep(t *testing.T) {
145+
ctx, cancel := context.WithCancel(context.Background())
146+
cancel()
147+
148+
isFatalFn := func(err error) bool {
149+
return errors.Is(err, context.Canceled)
150+
}
151+
152+
called := 0
153+
154+
err := Retry(ctx, testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error {
155+
called++
156+
return errors.New("try again")
157+
}, isFatalFn, exported.RetryOptions{
158+
RetryDelay: time.Hour,
159+
})
160+
161+
require.Error(t, err)
162+
require.ErrorIs(t, err, context.Canceled)
163+
require.Equal(t, called, 1)
164+
}
165+
166+
func TestCancellationFromUserFunc(t *testing.T) {
167+
alreadyCancelledCtx, cancel := context.WithCancel(context.Background())
168+
cancel()
169+
170+
canceledfromFunc := errors.New("the user func got the cancellation signal")
171+
172+
isFatalFn := func(err error) bool {
173+
return errors.Is(err, canceledfromFunc)
174+
}
175+
176+
called := 0
177+
178+
err := Retry(alreadyCancelledCtx, testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error {
179+
called++
180+
181+
select {
182+
case <-ctx.Done():
183+
return canceledfromFunc
184+
default:
185+
panic("Context should have been cancelled")
186+
}
187+
}, isFatalFn, exported.RetryOptions{})
188+
189+
require.Error(t, err)
190+
require.ErrorIs(t, err, canceledfromFunc)
191+
}
192+
193+
func TestCancellationTimeoutsArentPropagatedToUser(t *testing.T) {
194+
isFatalFn := func(err error) bool {
195+
// we want to exhaust all retries and run through the "sleep between retries" logic.
196+
return false
197+
}
198+
199+
tryAgainErr := errors.New("try again")
200+
called := 0
201+
202+
err := Retry(context.Background(), testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error {
203+
called++
204+
require.NoError(t, ctx.Err(), "our sleep/timeout doesn't show up for users")
205+
return tryAgainErr
206+
}, isFatalFn, exported.RetryOptions{
207+
RetryDelay: time.Millisecond,
208+
})
209+
210+
require.Error(t, err)
211+
require.ErrorIs(t, err, tryAgainErr, "error should be propagated from user callback")
212+
require.Equal(t, called, 1+3, "all attempts exhausted since we never returned a fatal error")
213+
}
214+
168215
func Test_calcDelay(t *testing.T) {
169216
t.Run("can't exceed max retry delay", func(t *testing.T) {
170217
duration := calcDelay(exported.RetryOptions{

0 commit comments

Comments
 (0)