From 6414ec88dc240ff4d117742fbed82e3d9970ce59 Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Wed, 12 Nov 2025 19:04:11 -0500 Subject: [PATCH] feat(retry): add configurable link recovery delay --- sdk/messaging/azeventhubs/CHANGELOG.md | 6 + .../internal/exported/retry_options.go | 11 ++ .../azeventhubs/internal/links_recover.go | 7 +- .../azeventhubs/internal/links_test.go | 14 +- .../azeventhubs/internal/links_unit_test.go | 21 +-- .../azeventhubs/internal/utils/retrier.go | 32 ++++- .../internal/utils/retrier_test.go | 136 ++++++++++++++++++ 7 files changed, 207 insertions(+), 20 deletions(-) diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index 3fdd9e553bd3..dc0e8942cfb6 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 2.0.2 (2025-11-17) + +### Features Added + +- Support for a configurable link recovery delay. + ## 2.0.1 (2025-10-08) ### Bugs Fixed diff --git a/sdk/messaging/azeventhubs/internal/exported/retry_options.go b/sdk/messaging/azeventhubs/internal/exported/retry_options.go index 6bed306ad5cd..5ffcbbdcda38 100644 --- a/sdk/messaging/azeventhubs/internal/exported/retry_options.go +++ b/sdk/messaging/azeventhubs/internal/exported/retry_options.go @@ -23,4 +23,15 @@ type RetryOptions struct { // Typically the value is greater than or equal to the value specified in RetryDelay. // The default Value is 120 seconds. A value less than zero means there is no cap. MaxRetryDelay time.Duration + + // LinkRecoveryDelay specifies a fixed delay to use after a link recovery failure, instead of + // the normal exponential backoff specified by RetryDelay. This only applies when an AMQP link + // needs to be recovered (e.g., link detached errors). For other types of failures, the normal + // RetryDelay with exponential backoff is used. + // The default value is zero, which means link recovery retries will use the normal RetryDelay + // exponential backoff behavior. A value less than zero means no delay after link recovery. + // Positive values let you slow down repeated link recovery attempts if, for example, recreating + // links is putting pressure on your namespace, while negative values let you immediately try + // again when link recovery failures happen. + LinkRecoveryDelay time.Duration } diff --git a/sdk/messaging/azeventhubs/internal/links_recover.go b/sdk/messaging/azeventhubs/internal/links_recover.go index bd578e231ec4..77c6fe8616f8 100644 --- a/sdk/messaging/azeventhubs/internal/links_recover.go +++ b/sdk/messaging/azeventhubs/internal/links_recover.go @@ -56,7 +56,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context, currentPrefix = linkWithID.String() if err := fn(ctx, linkWithID); err != nil { - if recoveryErr := l.RecoverIfNeeded(ctx, err); recoveryErr != nil { + if recoveryErr := l.RecoverIfNeeded(ctx, err, args); recoveryErr != nil { // it's okay to return this error, and we're still in an okay state. The next loop through will end // up reopening all the closed links and will either get the same error again (ie, network is _still_ // down) or will work and then things proceed as normal. @@ -74,7 +74,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context, // RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..) // NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved. -func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error { +func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error, args *utils.RetryFnArgs) error { rk := GetRecoveryKind(err) switch rk { @@ -93,6 +93,9 @@ func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) erro return err } + // Signal that the next retry should use link recovery delay + args.UseLinkRecoveryDelay() + return nil case RecoveryKindConn: var awErr amqpwrap.Error diff --git a/sdk/messaging/azeventhubs/internal/links_test.go b/sdk/messaging/azeventhubs/internal/links_test.go index 2c3310f0930d..cae549b0c0b1 100644 --- a/sdk/messaging/azeventhubs/internal/links_test.go +++ b/sdk/messaging/azeventhubs/internal/links_test.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils" "github.com/Azure/go-amqp" "github.com/stretchr/testify/require" ) @@ -88,8 +89,8 @@ func TestLinksRecoverLinkWithConnectionFailure(t *testing.T) { require.Equal(t, RecoveryKindConn, GetRecoveryKind(err)) // now recover like normal - - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID)) + args := &utils.RetryFnArgs{} + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID), args) require.NoError(t, err) newLWID, err := links.GetLink(context.Background(), "0") @@ -183,13 +184,15 @@ func TestLinkFailureWhenConnectionIsDead(t *testing.T) { require.Error(t, err) require.Equal(t, RecoveryKindConn, GetRecoveryKind(err)) - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID)) + args := &utils.RetryFnArgs{} + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID), args) var connErr *amqp.ConnError require.ErrorAs(t, err, &connErr) require.Nil(t, connErr.RemoteErr, "is the forwarded error from the closed connection") require.Equal(t, RecoveryKindConn, GetRecoveryKind(connErr), "next recovery would force a connection level recovery") - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID)) + args = &utils.RetryFnArgs{} + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID), args) require.NoError(t, err) newLWID, err := links.GetLink(context.Background(), "0") @@ -223,7 +226,8 @@ func TestLinkFailure(t *testing.T) { cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) defer cancel() - err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID)) + args := &utils.RetryFnArgs{} + err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID), args) require.NoError(t, err) newLWID, err := links.GetLink(context.Background(), "0") diff --git a/sdk/messaging/azeventhubs/internal/links_unit_test.go b/sdk/messaging/azeventhubs/internal/links_unit_test.go index dd0b31386e5b..05a8ff5ed818 100644 --- a/sdk/messaging/azeventhubs/internal/links_unit_test.go +++ b/sdk/messaging/azeventhubs/internal/links_unit_test.go @@ -11,6 +11,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/mock" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils" "github.com/Azure/go-amqp" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -26,7 +27,7 @@ func TestLinks_NoOp(t *testing.T) { }) // no error just no-ops - err := links.lr.RecoverIfNeeded(context.Background(), nil) + err := links.lr.RecoverIfNeeded(context.Background(), nil, &utils.RetryFnArgs{}) require.NoError(t, err) } @@ -55,13 +56,13 @@ func TestLinks_LinkStale(t *testing.T) { // we'll recover first, but our lwid (after this recovery) is stale since // the link cache will be updated after this is done. - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{}) require.NoError(t, err) require.Nil(t, links.links["0"], "closed link is removed from the cache") require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced") // trying to recover again is a no-op (if nothing is in the cache) - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{}) require.NoError(t, err) require.Nil(t, links.links["0"], "closed link is removed from the cache") require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced") @@ -75,7 +76,7 @@ func TestLinks_LinkStale(t *testing.T) { require.NotNil(t, newLWID) require.Equal(t, (*links.links["0"].link).LinkName(), newLWID.Link().LinkName(), "cache contains the newly created link for partition 0") - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{}) require.NoError(t, err) require.Equal(t, 0, receivers[0].CloseCalled, "receiver is NOT closed - we didn't need to replace it since the lwid with the error was stale") } @@ -102,7 +103,7 @@ func TestLinks_LinkRecoveryOnly(t *testing.T) { require.NotNil(t, lwid) require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0") - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{}) require.NoError(t, err) require.Nil(t, links.links["0"], "cache will no longer a link for partition 0") @@ -157,7 +158,7 @@ func TestLinks_ConnectionRecovery(t *testing.T) { ns.EXPECT().Recover(test.NotCancelled, gomock.Any()).Return(nil) // initiate a connection level recovery - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid), &utils.RetryFnArgs{}) require.NoError(t, err) // we still cleanup what we can (including cancelling our background negotiate claim loop) @@ -202,7 +203,7 @@ func TestLinks_LinkRecoveryButCloseIsCancelled(t *testing.T) { require.NotNil(t, lwid) require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0") - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{}) require.ErrorIs(t, err, context.Canceled) require.Nil(t, links.links["0"], "cache will no longer a link for partition 0") require.Equal(t, 0, connectionRecoverCalled, "Link level recovery, not connection level") @@ -254,7 +255,7 @@ func TestLinks_closeWithTimeout(t *testing.T) { // purposefully recover with what should be a link level recovery. However, the Close() failing // means we end up "upgrading" to a connection reset instead. - err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid)) + err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{}) require.ErrorIs(t, err, errToReturn) // we still cleanup what we can (including cancelling our background negotiate claim loop) @@ -292,7 +293,7 @@ func TestLinks_linkRecoveryOnly(t *testing.T) { lwid, err := links.GetLink(context.Background(), "0") require.NoError(t, err) - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{}) require.NoError(t, err) // we still cleanup what we can (including cancelling our background negotiate claim loop) @@ -329,7 +330,7 @@ func TestLinks_linkRecoveryFailsWithLinkFailure(t *testing.T) { lwid, err := links.GetLink(context.Background(), "0") require.NoError(t, err) - err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid)) + err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{}) require.Equal(t, err, detachErr) // we still cleanup what we can (including cancelling our background negotiate claim loop) diff --git a/sdk/messaging/azeventhubs/internal/utils/retrier.go b/sdk/messaging/azeventhubs/internal/utils/retrier.go index 2eae06415023..202cc28f92d5 100644 --- a/sdk/messaging/azeventhubs/internal/utils/retrier.go +++ b/sdk/messaging/azeventhubs/internal/utils/retrier.go @@ -23,7 +23,8 @@ type RetryFnArgs struct { // If you have potentially expensive LastErr error - resetAttempts bool + resetAttempts bool + useLinkRecoveryDelay bool } // ResetAttempts resets all Retry() attempts, starting back @@ -32,6 +33,13 @@ func (rf *RetryFnArgs) ResetAttempts() { rf.resetAttempts = true } +// UseLinkRecoveryDelay signals that the next retry should use the +// LinkRecoveryDelay instead of the normal exponential backoff delay. +// This is typically called after a link recovery operation. +func (rf *RetryFnArgs) UseLinkRecoveryDelay() { + rf.useLinkRecoveryDelay = true +} + // Retry runs a standard retry loop. It executes your passed in fn as the body of the loop. // It returns if it exceeds the number of configured retry options or if 'isFatal' returns true. func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exported.RetryOptions, fn func(ctx context.Context, callbackArgs *RetryFnArgs) error, isFatalFn func(err error) bool) error { @@ -43,11 +51,26 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp setDefaults(&ro) var err error + var useLinkRecoveryDelay bool for i := int32(0); i <= ro.MaxRetries; i++ { if i > 0 { - sleep := calcDelay(ro, i) - log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep) + var sleep time.Duration + + // Check if we should use link recovery delay instead of exponential backoff + if useLinkRecoveryDelay && ro.LinkRecoveryDelay > 0 { + // User configured a specific link recovery delay + sleep = ro.LinkRecoveryDelay + log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s (link recovery delay)", prefix(), i, sleep) + } else if useLinkRecoveryDelay && ro.LinkRecoveryDelay < 0 { + // User configured no delay for link recovery + sleep = 0 + log.Writef(eventName, "(%s) Retry attempt %d: no delay (link recovery)", prefix(), i) + } else { + // Default: use normal exponential backoff (includes LinkRecoveryDelay == 0) + sleep = calcDelay(ro, i) + log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep) + } select { case <-ctx.Done(): @@ -62,6 +85,9 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp } err = fn(ctx, &args) + // Capture the flag for the next iteration + useLinkRecoveryDelay = args.useLinkRecoveryDelay + if args.resetAttempts { log.Writef(eventName, "(%s) Resetting retry attempts", prefix()) diff --git a/sdk/messaging/azeventhubs/internal/utils/retrier_test.go b/sdk/messaging/azeventhubs/internal/utils/retrier_test.go index ef4d52200d0e..4e7ee480797b 100644 --- a/sdk/messaging/azeventhubs/internal/utils/retrier_test.go +++ b/sdk/messaging/azeventhubs/internal/utils/retrier_test.go @@ -474,6 +474,142 @@ func TestCalcDelay(t *testing.T) { }) } +func TestLinkRecoveryDelay(t *testing.T) { + isFatalFn := func(err error) bool { + return false // All errors are retryable + } + + t.Run("LinkRecoveryDelay with specific value", func(t *testing.T) { + var actualDelays []time.Duration + var lastTime time.Time = time.Now() + + err := Retry(context.Background(), testLogEvent, func() string { return "test" }, exported.RetryOptions{ + MaxRetries: 2, + RetryDelay: 100 * time.Millisecond, // Normal delay (not used) + MaxRetryDelay: 200 * time.Millisecond, + LinkRecoveryDelay: 10 * time.Millisecond, // Special link recovery delay + }, func(ctx context.Context, args *RetryFnArgs) error { + now := time.Now() + if args.I > 0 { + actualDelays = append(actualDelays, now.Sub(lastTime)) + } + lastTime = now + + // Simulate link recovery + args.UseLinkRecoveryDelay() + + return errors.New("link error") + }, isFatalFn) + + require.Error(t, err) + require.Len(t, actualDelays, 2) // 2 retries after initial attempt + + // All delays should be approximately LinkRecoveryDelay (10ms), not exponential backoff + for i, delay := range actualDelays { + require.GreaterOrEqual(t, delay, 8*time.Millisecond, "delay %d too short", i) + require.LessOrEqual(t, delay, 15*time.Millisecond, "delay %d too long", i) + } + }) + + t.Run("LinkRecoveryDelay with negative value (no delay)", func(t *testing.T) { + var actualDelays []time.Duration + var lastTime time.Time = time.Now() + + err := Retry(context.Background(), testLogEvent, func() string { return "test" }, exported.RetryOptions{ + MaxRetries: 2, + RetryDelay: 100 * time.Millisecond, // Normal delay (not used) + MaxRetryDelay: 200 * time.Millisecond, + LinkRecoveryDelay: -1, // No delay + }, func(ctx context.Context, args *RetryFnArgs) error { + now := time.Now() + if args.I > 0 { + actualDelays = append(actualDelays, now.Sub(lastTime)) + } + lastTime = now + + // Simulate link recovery + args.UseLinkRecoveryDelay() + + return errors.New("link error") + }, isFatalFn) + + require.Error(t, err) + require.Len(t, actualDelays, 2) + + // All delays should be essentially zero (immediate retry) + for i, delay := range actualDelays { + require.LessOrEqual(t, delay, 5*time.Millisecond, "delay %d should be near zero", i) + } + }) + + t.Run("LinkRecoveryDelay zero uses normal exponential backoff", func(t *testing.T) { + var actualDelays []time.Duration + var lastTime time.Time = time.Now() + + err := Retry(context.Background(), testLogEvent, func() string { return "test" }, exported.RetryOptions{ + MaxRetries: 2, + RetryDelay: 10 * time.Millisecond, + MaxRetryDelay: 100 * time.Millisecond, + LinkRecoveryDelay: 0, // Use normal exponential backoff + }, func(ctx context.Context, args *RetryFnArgs) error { + now := time.Now() + if args.I > 0 { + actualDelays = append(actualDelays, now.Sub(lastTime)) + } + lastTime = now + + // Simulate link recovery + args.UseLinkRecoveryDelay() + + return errors.New("link error") + }, isFatalFn) + + require.Error(t, err) + require.Len(t, actualDelays, 2) + + // First retry should be ~10ms * (2^1 - 1) = ~10ms with jitter + require.GreaterOrEqual(t, actualDelays[0], 8*time.Millisecond) + require.LessOrEqual(t, actualDelays[0], 15*time.Millisecond) + + // Second retry should be ~10ms * (2^2 - 1) = ~30ms with jitter + require.GreaterOrEqual(t, actualDelays[1], 24*time.Millisecond) + require.LessOrEqual(t, actualDelays[1], 40*time.Millisecond) + }) + + t.Run("Normal error without UseLinkRecoveryDelay uses exponential backoff", func(t *testing.T) { + var actualDelays []time.Duration + var lastTime time.Time = time.Now() + + err := Retry(context.Background(), testLogEvent, func() string { return "test" }, exported.RetryOptions{ + MaxRetries: 2, + RetryDelay: 10 * time.Millisecond, + MaxRetryDelay: 100 * time.Millisecond, + LinkRecoveryDelay: 5 * time.Millisecond, // Should NOT be used + }, func(ctx context.Context, args *RetryFnArgs) error { + now := time.Now() + if args.I > 0 { + actualDelays = append(actualDelays, now.Sub(lastTime)) + } + lastTime = now + + // Do NOT call UseLinkRecoveryDelay() - this is a normal error + return errors.New("normal error") + }, isFatalFn) + + require.Error(t, err) + require.Len(t, actualDelays, 2) + + // Should use exponential backoff, not LinkRecoveryDelay + // First retry: ~10ms with jitter + require.GreaterOrEqual(t, actualDelays[0], 8*time.Millisecond) + require.LessOrEqual(t, actualDelays[0], 15*time.Millisecond) + + // Second retry: ~30ms with jitter + require.GreaterOrEqual(t, actualDelays[1], 24*time.Millisecond) + require.LessOrEqual(t, actualDelays[1], 40*time.Millisecond) + }) +} + // retryRE is used to replace the 'retry time' with a consistent string to make // unit tests against logging simpler // A typical string: "[azsb.Retry] (retry) Attempt 1 sleeping for 1.10233ms"