Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 2.0.2 (2025-11-17)

### Features Added

- Support for a configurable link recovery delay.

## 2.0.1 (2025-10-08)

### Bugs Fixed
Expand Down
11 changes: 11 additions & 0 deletions sdk/messaging/azeventhubs/internal/exported/retry_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,15 @@ type RetryOptions struct {
// Typically the value is greater than or equal to the value specified in RetryDelay.
// The default Value is 120 seconds. A value less than zero means there is no cap.
MaxRetryDelay time.Duration

// LinkRecoveryDelay specifies a fixed delay to use after a link recovery failure, instead of
// the normal exponential backoff specified by RetryDelay. This only applies when an AMQP link
// needs to be recovered (e.g., link detached errors). For other types of failures, the normal
// RetryDelay with exponential backoff is used.
// The default value is zero, which means link recovery retries will use the normal RetryDelay
// exponential backoff behavior. A value less than zero means no delay after link recovery.
// Positive values let you slow down repeated link recovery attempts if, for example, recreating
// links is putting pressure on your namespace, while negative values let you immediately try
// again when link recovery failures happen.
LinkRecoveryDelay time.Duration
}
7 changes: 5 additions & 2 deletions sdk/messaging/azeventhubs/internal/links_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
currentPrefix = linkWithID.String()

if err := fn(ctx, linkWithID); err != nil {
if recoveryErr := l.RecoverIfNeeded(ctx, err); recoveryErr != nil {
if recoveryErr := l.RecoverIfNeeded(ctx, err, args); recoveryErr != nil {
// it's okay to return this error, and we're still in an okay state. The next loop through will end
// up reopening all the closed links and will either get the same error again (ie, network is _still_
// down) or will work and then things proceed as normal.
Expand All @@ -74,7 +74,7 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,

// RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..)
// NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error {
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error, args *utils.RetryFnArgs) error {
rk := GetRecoveryKind(err)

switch rk {
Expand All @@ -93,6 +93,9 @@ func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) erro
return err
}

// Signal that the next retry should use link recovery delay
args.UseLinkRecoveryDelay()

return nil
case RecoveryKindConn:
var awErr amqpwrap.Error
Expand Down
14 changes: 9 additions & 5 deletions sdk/messaging/azeventhubs/internal/links_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -88,8 +89,8 @@ func TestLinksRecoverLinkWithConnectionFailure(t *testing.T) {
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

// now recover like normal

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID))
args := &utils.RetryFnArgs{}
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(err, oldLWID), args)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
Expand Down Expand Up @@ -183,13 +184,15 @@ func TestLinkFailureWhenConnectionIsDead(t *testing.T) {
require.Error(t, err)
require.Equal(t, RecoveryKindConn, GetRecoveryKind(err))

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID))
args := &utils.RetryFnArgs{}
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, oldLWID), args)
var connErr *amqp.ConnError
require.ErrorAs(t, err, &connErr)
require.Nil(t, connErr.RemoteErr, "is the forwarded error from the closed connection")
require.Equal(t, RecoveryKindConn, GetRecoveryKind(connErr), "next recovery would force a connection level recovery")

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID))
args = &utils.RetryFnArgs{}
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(connErr, oldLWID), args)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
Expand Down Expand Up @@ -223,7 +226,8 @@ func TestLinkFailure(t *testing.T) {
cancelledCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID))
args := &utils.RetryFnArgs{}
err = links.lr.RecoverIfNeeded(cancelledCtx, lwidToError(err, oldLWID), args)
require.NoError(t, err)

newLWID, err := links.GetLink(context.Background(), "0")
Expand Down
21 changes: 11 additions & 10 deletions sdk/messaging/azeventhubs/internal/links_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/mock"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/internal/utils"
"github.com/Azure/go-amqp"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand All @@ -26,7 +27,7 @@ func TestLinks_NoOp(t *testing.T) {
})

// no error just no-ops
err := links.lr.RecoverIfNeeded(context.Background(), nil)
err := links.lr.RecoverIfNeeded(context.Background(), nil, &utils.RetryFnArgs{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -55,13 +56,13 @@ func TestLinks_LinkStale(t *testing.T) {
// we'll recover first, but our lwid (after this recovery) is stale since
// the link cache will be updated after this is done.

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
require.NoError(t, err)
require.Nil(t, links.links["0"], "closed link is removed from the cache")
require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced")

// trying to recover again is a no-op (if nothing is in the cache)
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
require.NoError(t, err)
require.Nil(t, links.links["0"], "closed link is removed from the cache")
require.Equal(t, 1, receivers[0].CloseCalled, "original receiver is closed, and replaced")
Expand All @@ -75,7 +76,7 @@ func TestLinks_LinkStale(t *testing.T) {
require.NotNil(t, newLWID)
require.Equal(t, (*links.links["0"].link).LinkName(), newLWID.Link().LinkName(), "cache contains the newly created link for partition 0")

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, staleLWID), &utils.RetryFnArgs{})
require.NoError(t, err)
require.Equal(t, 0, receivers[0].CloseCalled, "receiver is NOT closed - we didn't need to replace it since the lwid with the error was stale")
}
Expand All @@ -102,7 +103,7 @@ func TestLinks_LinkRecoveryOnly(t *testing.T) {
require.NotNil(t, lwid)
require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0")

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
require.NoError(t, err)
require.Nil(t, links.links["0"], "cache will no longer a link for partition 0")

Expand Down Expand Up @@ -157,7 +158,7 @@ func TestLinks_ConnectionRecovery(t *testing.T) {
ns.EXPECT().Recover(test.NotCancelled, gomock.Any()).Return(nil)

// initiate a connection level recovery
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.ConnError{}, lwid), &utils.RetryFnArgs{})
require.NoError(t, err)

// we still cleanup what we can (including cancelling our background negotiate claim loop)
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestLinks_LinkRecoveryButCloseIsCancelled(t *testing.T) {
require.NotNil(t, lwid)
require.NotNil(t, links.links["0"], "cache contains the newly created link for partition 0")

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, links.links["0"], "cache will no longer a link for partition 0")
require.Equal(t, 0, connectionRecoverCalled, "Link level recovery, not connection level")
Expand Down Expand Up @@ -254,7 +255,7 @@ func TestLinks_closeWithTimeout(t *testing.T) {

// purposefully recover with what should be a link level recovery. However, the Close() failing
// means we end up "upgrading" to a connection reset instead.
err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid))
err = links.lr.RecoverIfNeeded(userCtx, lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
require.ErrorIs(t, err, errToReturn)

// we still cleanup what we can (including cancelling our background negotiate claim loop)
Expand Down Expand Up @@ -292,7 +293,7 @@ func TestLinks_linkRecoveryOnly(t *testing.T) {
lwid, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
require.NoError(t, err)

// we still cleanup what we can (including cancelling our background negotiate claim loop)
Expand Down Expand Up @@ -329,7 +330,7 @@ func TestLinks_linkRecoveryFailsWithLinkFailure(t *testing.T) {
lwid, err := links.GetLink(context.Background(), "0")
require.NoError(t, err)

err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid))
err = links.lr.RecoverIfNeeded(context.Background(), lwidToError(&amqp.LinkError{}, lwid), &utils.RetryFnArgs{})
require.Equal(t, err, detachErr)

// we still cleanup what we can (including cancelling our background negotiate claim loop)
Expand Down
32 changes: 29 additions & 3 deletions sdk/messaging/azeventhubs/internal/utils/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type RetryFnArgs struct {
// If you have potentially expensive
LastErr error

resetAttempts bool
resetAttempts bool
useLinkRecoveryDelay bool
}

// ResetAttempts resets all Retry() attempts, starting back
Expand All @@ -32,6 +33,13 @@ func (rf *RetryFnArgs) ResetAttempts() {
rf.resetAttempts = true
}

// UseLinkRecoveryDelay signals that the next retry should use the
// LinkRecoveryDelay instead of the normal exponential backoff delay.
// This is typically called after a link recovery operation.
func (rf *RetryFnArgs) UseLinkRecoveryDelay() {
rf.useLinkRecoveryDelay = true
}

// Retry runs a standard retry loop. It executes your passed in fn as the body of the loop.
// It returns if it exceeds the number of configured retry options or if 'isFatal' returns true.
func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exported.RetryOptions, fn func(ctx context.Context, callbackArgs *RetryFnArgs) error, isFatalFn func(err error) bool) error {
Expand All @@ -43,11 +51,26 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp
setDefaults(&ro)

var err error
var useLinkRecoveryDelay bool

for i := int32(0); i <= ro.MaxRetries; i++ {
if i > 0 {
sleep := calcDelay(ro, i)
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep)
var sleep time.Duration

// Check if we should use link recovery delay instead of exponential backoff
if useLinkRecoveryDelay && ro.LinkRecoveryDelay > 0 {
// User configured a specific link recovery delay
sleep = ro.LinkRecoveryDelay
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s (link recovery delay)", prefix(), i, sleep)
} else if useLinkRecoveryDelay && ro.LinkRecoveryDelay < 0 {
// User configured no delay for link recovery
sleep = 0
log.Writef(eventName, "(%s) Retry attempt %d: no delay (link recovery)", prefix(), i)
} else {
// Default: use normal exponential backoff (includes LinkRecoveryDelay == 0)
sleep = calcDelay(ro, i)
log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", prefix(), i, sleep)
}

select {
case <-ctx.Done():
Expand All @@ -62,6 +85,9 @@ func Retry(ctx context.Context, eventName log.Event, prefix func() string, o exp
}
err = fn(ctx, &args)

// Capture the flag for the next iteration
useLinkRecoveryDelay = args.useLinkRecoveryDelay

if args.resetAttempts {
log.Writef(eventName, "(%s) Resetting retry attempts", prefix())

Expand Down
Loading
Loading