Skip to content

Commit f908602

Browse files
committed
feat(retry): add configurable link recovery delay
1 parent d0604ff commit f908602

File tree

7 files changed

+207
-20
lines changed

7 files changed

+207
-20
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Release History
22

3+
## 2.0.2 (2025-11-15)
4+
5+
### Features Added
6+
7+
- Added support for a configurable link recovery delay.
8+
39
## 2.0.1 (2025-10-08)
410

511
### Bugs Fixed

sdk/messaging/azeventhubs/internal/exported/retry_options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,15 @@ type RetryOptions struct {
2323
// Typically the value is greater than or equal to the value specified in RetryDelay.
2424
// The default Value is 120 seconds. A value less than zero means there is no cap.
2525
MaxRetryDelay time.Duration
26+
27+
// LinkRecoveryDelay specifies a fixed delay to use after a link recovery failure, instead of
28+
// the normal exponential backoff specified by RetryDelay. This only applies when an AMQP link
29+
// needs to be recovered (e.g., link detached errors). For other types of failures, the normal
30+
// RetryDelay with exponential backoff is used.
31+
// The default value is zero, which means link recovery retries will use the normal RetryDelay
32+
// exponential backoff behavior. A value less than zero means no delay after link recovery.
33+
// Positive values let you slow down repeated link recovery attempts if, for example, recreating
34+
// links is putting pressure on your namespace, while negative values let you immediately try
35+
// again when link recovery failures happen.
36+
LinkRecoveryDelay time.Duration
2637
}

sdk/messaging/azeventhubs/internal/links_recover.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
5656
currentPrefix = linkWithID.String()
5757

5858
if err := fn(ctx, linkWithID); err != nil {
59-
if recoveryErr := l.RecoverIfNeeded(ctx, err); recoveryErr != nil {
59+
if recoveryErr := l.RecoverIfNeeded(ctx, err, args); recoveryErr != nil {
6060
// it's okay to return this error, and we're still in an okay state. The next loop through will end
6161
// up reopening all the closed links and will either get the same error again (ie, network is _still_
6262
// down) or will work and then things proceed as normal.
@@ -74,7 +74,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
7474

7575
// RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..)
7676
// NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
77-
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error {
77+
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error, args *utils.RetryFnArgs) error {
7878
rk := GetRecoveryKind(err)
7979

8080
switch rk {
@@ -93,6 +93,9 @@ func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) erro
9393
return err
9494
}
9595

96+
// Signal that the next retry should use link recovery delay
97+
args.UseLinkRecoveryDelay()
98+
9699
return nil
97100
case RecoveryKindConn:
98101
var awErr amqpwrap.Error

sdk/messaging/azeventhubs/internal/links_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap"
1616
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/exported"
1717
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils"
1819
"github.com/Azure/go-amqp"
1920
"github.com/stretchr/testify/require"
2021
)
@@ -88,8 +89,8 @@ func TestLinksRecoverLinkWithConnectionFailure(t *testing.T) {
8889
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))
8990

9091
// now recover like normal
91-
92-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID))
92+
args := &utils.RetryFnArgs{}
93+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID), args)
9394
require.NoError(t, err)
9495

9596
newLWID, err := links.GetLink(context.Background(), "0")
@@ -183,13 +184,15 @@ func TestLinkFailureWhenConnectionIsDead(t *testing.T) {
183184
require.Error(t, err)
184185
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))
185186

186-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID))
187+
args := &utils.RetryFnArgs{}
188+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID), args)
187189
var connErr *amqp.ConnError
188190
require.ErrorAs(t, err, &connErr)
189191
require.Nil(t, connErr.RemoteErr, "is the forwarded error from the closed connection")
190192
require.Equal(t, RecoveryKindConn, GetRecoveryKind(connErr), "next recovery would force a connection level recovery")
191193

192-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID))
194+
args = &utils.RetryFnArgs{}
195+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID), args)
193196
require.NoError(t, err)
194197

195198
newLWID, err := links.GetLink(context.Background(), "0")
@@ -223,7 +226,8 @@ func TestLinkFailure(t *testing.T) {
223226
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
224227
defer cancel()
225228

226-
err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID))
229+
args := &utils.RetryFnArgs{}
230+
err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID), args)
227231
require.NoError(t, err)
228232

229233
newLWID, err := links.GetLink(context.Background(), "0")

sdk/messaging/azeventhubs/internal/links_unit_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap"
1212
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/mock"
1313
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test"
14+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils"
1415
"github.com/Azure/go-amqp"
1516
"github.com/golang/mock/gomock"
1617
"github.com/stretchr/testify/require"
@@ -26,7 +27,7 @@ func TestLinks_NoOp(t *testing.T) {
2627
})
2728

2829
// no error just no-ops
29-
err := links.lr.RecoverIfNeeded(context.Background(), nil)
30+
err := links.lr.RecoverIfNeeded(context.Background(), nil, &utils.RetryFnArgs{})
3031
require.NoError(t, err)
3132
}
3233

@@ -55,13 +56,13 @@ func TestLinks_LinkStale(t *testing.T) {
5556
// we'll recover first, but our lwid (after this recovery) is stale since
5657
// the link cache will be updated after this is done.
5758

58-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
59+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
5960
require.NoError(t, err)
6061
require.Nil(t, links.links["0"], "closed link is removed from the cache")
6162
require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced")
6263

6364
// trying to recover again is a no-op (if nothing is in the cache)
64-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
65+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
6566
require.NoError(t, err)
6667
require.Nil(t, links.links["0"], "closed link is removed from the cache")
6768
require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced")
@@ -75,7 +76,7 @@ func TestLinks_LinkStale(t *testing.T) {
7576
require.NotNil(t, newLWID)
7677
require.Equal(t, (*links.links["0"].link).LinkName(), newLWID.Link().LinkName(), "cache contains the newly created link for partition 0")
7778

78-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
79+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
7980
require.NoError(t, err)
8081
require.Equal(t, 0, receivers[0].CloseCalled, "receiver is NOT closed - we didn't need to replace it since the lwid with the error was stale")
8182
}
@@ -102,7 +103,7 @@ func TestLinks_LinkRecoveryOnly(t *testing.T) {
102103
require.NotNil(t, lwid)
103104
require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0")
104105

105-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
106+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
106107
require.NoError(t, err)
107108
require.Nil(t, links.links["0"], "cache will no longer a link for partition 0")
108109

@@ -157,7 +158,7 @@ func TestLinks_ConnectionRecovery(t *testing.T) {
157158
ns.EXPECT().Recover(test.NotCancelled, gomock.Any()).Return(nil)
158159

159160
// initiate a connection level recovery
160-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid))
161+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid), &utils.RetryFnArgs{})
161162
require.NoError(t, err)
162163

163164
// we still cleanup what we can (including cancelling our background negotiate claim loop)
@@ -202,7 +203,7 @@ func TestLinks_LinkRecoveryButCloseIsCancelled(t *testing.T) {
202203
require.NotNil(t, lwid)
203204
require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0")
204205

205-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
206+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
206207
require.ErrorIs(t, err, context.Canceled)
207208
require.Nil(t, links.links["0"], "cache will no longer a link for partition 0")
208209
require.Equal(t, 0, connectionRecoverCalled, "Link level recovery, not connection level")
@@ -254,7 +255,7 @@ func TestLinks_closeWithTimeout(t *testing.T) {
254255

255256
// purposefully recover with what should be a link level recovery. However, the Close() failing
256257
// means we end up "upgrading" to a connection reset instead.
257-
err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid))
258+
err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
258259
require.ErrorIs(t, err, errToReturn)
259260

260261
// we still cleanup what we can (including cancelling our background negotiate claim loop)
@@ -292,7 +293,7 @@ func TestLinks_linkRecoveryOnly(t *testing.T) {
292293
lwid, err := links.GetLink(context.Background(), "0")
293294
require.NoError(t, err)
294295

295-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
296+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
296297
require.NoError(t, err)
297298

298299
// we still cleanup what we can (including cancelling our background negotiate claim loop)
@@ -329,7 +330,7 @@ func TestLinks_linkRecoveryFailsWithLinkFailure(t *testing.T) {
329330
lwid, err := links.GetLink(context.Background(), "0")
330331
require.NoError(t, err)
331332

332-
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
333+
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
333334
require.Equal(t, err, detachErr)
334335

335336
// we still cleanup what we can (including cancelling our background negotiate claim loop)

sdk/messaging/azeventhubs/internal/utils/retrier.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ type RetryFnArgs struct {
2323
// If you have potentially expensive
2424
LastErr error
2525

26-
resetAttempts bool
26+
resetAttempts bool
27+
useLinkRecoveryDelay bool
2728
}
2829

2930
// ResetAttempts resets all Retry() attempts, starting back
@@ -32,6 +33,13 @@ func (rf *RetryFnArgs) ResetAttempts() {
3233
rf.resetAttempts = true
3334
}
3435

36+
// UseLinkRecoveryDelay signals that the next retry should use the
37+
// LinkRecoveryDelay instead of the normal exponential backoff delay.
38+
// This is typically called after a link recovery operation.
39+
func (rf *RetryFnArgs) UseLinkRecoveryDelay() {
40+
rf.useLinkRecoveryDelay = true
41+
}
42+
3543
// Retry runs a standard retry loop. It executes your passed in fn as the body of the loop.
3644
// It returns if it exceeds the number of configured retry options or if 'isFatal' returns true.
3745
func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exported.RetryOptions, fn func(ctx context.Context, callbackArgs *RetryFnArgs) error, isFatalFn func(err error) bool) error {
@@ -43,11 +51,26 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp
4351
setDefaults(&ro)
4452

4553
var err error
54+
var useLinkRecoveryDelay bool
4655

4756
for i := int32(0); i <= ro.MaxRetries; i++ {
4857
if i > 0 {
49-
sleep := calcDelay(ro, i)
50-
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep)
58+
var sleep time.Duration
59+
60+
// Check if we should use link recovery delay instead of exponential backoff
61+
if useLinkRecoveryDelay && ro.LinkRecoveryDelay > 0 {
62+
// User configured a specific link recovery delay
63+
sleep = ro.LinkRecoveryDelay
64+
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s (link recovery delay)", prefix(), i, sleep)
65+
} else if useLinkRecoveryDelay && ro.LinkRecoveryDelay < 0 {
66+
// User configured no delay for link recovery
67+
sleep = 0
68+
log.Writef(eventName, "(%s) Retry attempt %d: no delay (link recovery)", prefix(), i)
69+
} else {
70+
// Default: use normal exponential backoff (includes LinkRecoveryDelay == 0)
71+
sleep = calcDelay(ro, i)
72+
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep)
73+
}
5174

5275
select {
5376
case <-ctx.Done():
@@ -62,6 +85,9 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp
6285
}
6386
err = fn(ctx, &args)
6487

88+
// Capture the flag for the next iteration
89+
useLinkRecoveryDelay = args.useLinkRecoveryDelay
90+
6591
if args.resetAttempts {
6692
log.Writef(eventName, "(%s) Resetting retry attempts", prefix())
6793

0 commit comments

Comments
 (0)