Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 57 additions & 121 deletions pkg/connectorbuilder/connectorbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"slices"
"sort"
"time"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/conductorone/baton-sdk/pkg/crypto"
"github.com/conductorone/baton-sdk/pkg/metrics"
"github.com/conductorone/baton-sdk/pkg/pagination"
"github.com/conductorone/baton-sdk/pkg/retry"
"github.com/conductorone/baton-sdk/pkg/types"
"github.com/conductorone/baton-sdk/pkg/types/tasks"
"github.com/conductorone/baton-sdk/pkg/uhttp"
Expand Down Expand Up @@ -915,54 +915,48 @@ func (b *builderImpl) Grant(ctx context.Context, request *v2.GrantManagerService
tt := tasks.GrantType
l := ctxzap.Extract(ctx)

var (
attempt = 0
baseDelay = 30 * time.Second
rt = request.Entitlement.Resource.Id.ResourceType
)
rt := request.Entitlement.Resource.Id.ResourceType

provisioner, v1ok := b.resourceProvisioners[rt]
provisionerV2, v2ok := b.resourceProvisionersV2[rt]
if !v1ok && !v2ok {
retryer := retry.NewRetryer(ctx, retry.RetryConfig{
MaxAttempts: 3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxAttempts should be 2.
The Temporal activity has a timeout of 5 minutes, so the time from start to close shouldn't exceed that limit. c1 ticket would end up in a weird state if we retry 3 times.

For example,
image

Customers are not aware of the current status of the ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three retries would wait 30, 60, then 90 seconds, which is 3 minutes total.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three minutes seems to be the ideal timing. I haven’t looked into the code to see how C1 tickets change status, but based on my testing, three attempts don’t work — the C1 ticket seems to be stuck.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image I am not sure if it's because of dev env. I retry twice, which is 30 + 60 seconds, but the c1 ticket still takes around 4 minutes to respond. so I don't think it would work if the last retry takes 90 seconds.

InitialDelay: 15 * time.Second,
MaxDelay: 60 * time.Second,
})

var grantFunc func(ctx context.Context, principal *v2.Resource, entitlement *v2.Entitlement) ([]*v2.Grant, annotations.Annotations, error)
provisioner, ok := b.resourceProvisioners[rt]
if ok {
grantFunc = func(ctx context.Context, principal *v2.Resource, entitlement *v2.Entitlement) ([]*v2.Grant, annotations.Annotations, error) {
annos, err := provisioner.Grant(ctx, principal, entitlement)
if err != nil {
return nil, annos, err
}
return nil, annos, nil
}
}
provisionerV2, ok := b.resourceProvisionersV2[rt]
if ok {
grantFunc = provisionerV2.Grant
}

if grantFunc == nil {
l.Error("error: resource type does not have provisioner configured", zap.String("resource_type", rt))
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: resource type does not have provisioner configured")
}

for {
if v1ok {
annos, err := provisioner.Grant(ctx, request.Principal, request.Entitlement)
if err != nil {
l.Error("error: grant failed", zap.Error(err))
if !b.shouldWaitAndRetry(ctx, err, baseDelay) || attempt >= 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change I think increased the attempt count fwiw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the old code tried twice before erroring. I changed it to three because that seems like a good number of attempts before giving up.

b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("err: grant failed: %w", err)
}

attempt++
baseDelay *= 2
continue
}

grants, annos, err := grantFunc(ctx, request.Principal, request.Entitlement)
if err == nil {
b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.GrantManagerServiceGrantResponse{Annotations: annos}, nil
return &v2.GrantManagerServiceGrantResponse{Annotations: annos, Grants: grants}, nil
}

grants, annos, err := provisionerV2.Grant(ctx, request.Principal, request.Entitlement)
if err != nil {
l.Error("error: grant failed", zap.Error(err))
if !b.shouldWaitAndRetry(ctx, err, baseDelay) || attempt >= 2 {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("err: grant failed: %w", err)
}

attempt++
baseDelay *= 2
if retryer.ShouldWaitAndRetry(ctx, err) {
continue
}

b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.GrantManagerServiceGrantResponse{Annotations: annos, Grants: grants}, nil
l.Error("error: grant failed", zap.Error(err))
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("err: grant failed: %w", err)
}
}

Expand All @@ -975,54 +969,42 @@ func (b *builderImpl) Revoke(ctx context.Context, request *v2.GrantManagerServic

l := ctxzap.Extract(ctx)

var (
attempt = 0
baseDelay = 30 * time.Second
rt = request.Grant.Entitlement.Resource.Id.ResourceType
)
rt := request.Grant.Entitlement.Resource.Id.ResourceType

retryer := retry.NewRetryer(ctx, retry.RetryConfig{
MaxAttempts: 3,
InitialDelay: 15 * time.Second,
MaxDelay: 60 * time.Second,
})

provisioner, v1ok := b.resourceProvisioners[rt]
provisionerV2, v2ok := b.resourceProvisionersV2[rt]
if !v1ok && !v2ok {
var revokeFunc func(ctx context.Context, grant *v2.Grant) (annotations.Annotations, error)
provisioner, ok := b.resourceProvisioners[rt]
if ok {
revokeFunc = provisioner.Revoke
}
provisionerV2, ok := b.resourceProvisionersV2[rt]
if ok {
revokeFunc = provisionerV2.Revoke
}

if revokeFunc == nil {
l.Error("error: resource type does not have provisioner configured", zap.String("resource_type", rt))
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: resource type does not have provisioner configured")
}

for {
if v1ok {
annos, err := provisioner.Revoke(ctx, request.Grant)
if err != nil {
l.Error("error: revoke failed", zap.Error(err))

if !b.shouldWaitAndRetry(ctx, err, baseDelay) || attempt >= 2 {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: revoke failed: %w", err)
}

attempt++
baseDelay *= 2
continue
}
annos, err := revokeFunc(ctx, request.Grant)
if err == nil {
b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.GrantManagerServiceRevokeResponse{Annotations: annos}, nil
}

annos, err := provisionerV2.Revoke(ctx, request.Grant)
if err != nil {
l.Error("error: revoke failed", zap.Error(err))

if !b.shouldWaitAndRetry(ctx, err, baseDelay) || attempt >= 2 {
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: revoke failed: %w", err)
}

attempt++
baseDelay *= 2
if retryer.ShouldWaitAndRetry(ctx, err) {
continue
}

b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return &v2.GrantManagerServiceRevokeResponse{Annotations: annos}, nil
l.Error("error: revoke failed", zap.Error(err))
b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start))
return nil, fmt.Errorf("error: revoke failed: %w", err)
}
}

Expand Down Expand Up @@ -1368,49 +1350,3 @@ func (b *builderImpl) GetActionStatus(ctx context.Context, request *v2.GetAction
b.m.RecordTaskSuccess(ctx, tt, b.nowFunc().Sub(start))
return resp, nil
}

func (b *builderImpl) shouldWaitAndRetry(ctx context.Context, err error, baseDelay time.Duration) bool {
ctx, span := tracer.Start(ctx, "provisioner.shouldWaitAndRetry")
defer span.End()

if err == nil {
return false
}

if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
return false
}

// If error contains rate limit data, use that instead
if st, ok := status.FromError(err); ok {
details := st.Details()
for _, detail := range details {
if rlData, ok := detail.(*v2.RateLimitDescription); ok {
waitResetAt := time.Until(rlData.ResetAt.AsTime())
if waitResetAt <= 0 {
continue
}
duration := time.Duration(rlData.Limit)
if duration <= 0 {
continue
}
waitResetAt /= duration
// Round up to the nearest second to make sure we don't hit the rate limit again
waitResetAt = time.Duration(math.Ceil(waitResetAt.Seconds())) * time.Second
if waitResetAt > 0 {
baseDelay = waitResetAt
break
}
}
}
}

for {
select {
case <-time.After(baseDelay):
return true
case <-ctx.Done():
return false
}
}
}
113 changes: 113 additions & 0 deletions pkg/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package retry

import (
"context"
"math"
"time"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var tracer = otel.Tracer("baton-sdk/retry")

type Retryer struct {
attempts uint
maxAttempts uint
initialDelay time.Duration
maxDelay time.Duration
}

type RetryConfig struct {
MaxAttempts uint // 0 means no limit (which is also the default).
InitialDelay time.Duration // Default is 1 second.
MaxDelay time.Duration // Default is 60 seconds. 0 means no limit.
}

func NewRetryer(ctx context.Context, config RetryConfig) *Retryer {
r := &Retryer{
attempts: 0,
maxAttempts: config.MaxAttempts,
initialDelay: config.InitialDelay,
maxDelay: config.MaxDelay,
}
if r.initialDelay == 0 {
r.initialDelay = time.Second
}
if r.maxDelay == 0 {
r.maxDelay = 60 * time.Second
}
return r
}

func (r *Retryer) ShouldWaitAndRetry(ctx context.Context, err error) bool {
ctx, span := tracer.Start(ctx, "retry.ShouldWaitAndRetry")
defer span.End()

if err == nil {
r.attempts = 0
return true
}
if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
return false
}
Comment on lines +55 to +57
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider handling additional recoverable gRPC codes (e.g. ResourceExhausted, Internal).

Only Unavailable and DeadlineExceeded are currently retried. Connectors often surface transient throttling under ResourceExhausted and some load-balancers use Internal for “connection reset” conditions. You may unintentionally fail otherwise transient operations.

-	if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
+	switch status.Code(err) {
+	case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
+		// retryable
+	default:
 		return false
-	}
+	}

🏁 Script executed:

#!/bin/bash
# Locate retry.go and display the relevant retry logic context
file=$(fd --exact-name retry.go)
echo "Found file: $file"
sed -n '40,80p' "$file"

Length of output: 345


🏁 Script executed:

#!/bin/bash
# Locate retry.go using find
file=$(find . -type f -name "retry.go" | head -n1)
if [ -z "$file" ]; then
  echo "retry.go not found"
  exit 1
fi
echo "Found file: $file"
# Show lines around the retry condition (lines 40–80)
sed -n '40,80p' "$file"

Length of output: 1269


Enhance retry logic to include additional transient gRPC codes

Currently only Unavailable and DeadlineExceeded are retried. In practice you’ll also see transient throttling surfaced as ResourceExhausted and connection‐reset errors as Internal. Without including these, short‐lived failures will fall through and not be retried.

Suggested change in pkg/retry/retry.go around line 55:

-   if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
-       return false
-   }
+   switch status.Code(err) {
+   case codes.Unavailable,
+        codes.DeadlineExceeded,
+        codes.ResourceExhausted,
+        codes.Internal:
+       // retryable
+   default:
+       return false
+   }

This ensures you’ll back off and retry on throttling (ResourceExhausted) and connection resets (Internal) as well.


r.attempts++
l := ctxzap.Extract(ctx)

if r.maxAttempts > 0 && r.attempts > r.maxAttempts {
l.Warn("max attempts reached", zap.Error(err), zap.Uint("max_attempts", r.maxAttempts))
return false
}

// use linear backoff by default
var wait time.Duration
if r.attempts > math.MaxInt64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a lot of attempts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to get the linter to shut up.

wait = r.maxDelay
} else {
wait = time.Duration(int64(r.attempts)) * r.initialDelay
}

// If error contains rate limit data, use that instead
if st, ok := status.FromError(err); ok {
details := st.Details()
for _, detail := range details {
if rlData, ok := detail.(*v2.RateLimitDescription); ok {
waitResetAt := time.Until(rlData.ResetAt.AsTime())
if waitResetAt <= 0 {
continue
}
duration := time.Duration(rlData.Limit)
if duration <= 0 {
continue
}
waitResetAt /= duration
// Round up to the nearest second to make sure we don't hit the rate limit again
waitResetAt = time.Duration(math.Ceil(waitResetAt.Seconds())) * time.Second
if waitResetAt > 0 {
wait = waitResetAt
break
}
}
}
Comment on lines +75 to +96
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Rate-limit wait calculation appears incorrect.

waitResetAt /= duration divides a duration by the numeric limit, yielding an arbitrary nanosecond value.
Usually you want time.Until(reset) + smallBuffer or time.Until(reset)/remaining, not “duration ÷ limit”.

Please revisit the algorithm; otherwise the retryer may sleep far too short or far too long.

}

if wait > r.maxDelay {
wait = r.maxDelay
}

l.Warn("retrying operation", zap.Error(err), zap.Duration("wait", wait))

for {
select {
case <-time.After(wait):
return true
case <-ctx.Done():
return false
}
}
}
50 changes: 50 additions & 0 deletions pkg/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package retry

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestBasicRetry(t *testing.T) {
ctx := context.Background()
retryer := NewRetryer(ctx, RetryConfig{
MaxAttempts: 3,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 1 * time.Second,
})

shouldRetry := retryer.ShouldWaitAndRetry(ctx, errors.New("generic unrecoverable error"))
require.False(t, shouldRetry, "generic unrecoverable error should not be retried")

shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unavailable, "recoverable error"))
require.True(t, shouldRetry, "recoverable error should be retried")

shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unknown, "unknown error"))
require.False(t, shouldRetry, "unknown error should not be retried")

// This has the side effect of resetting attempts to 0.
shouldRetry = retryer.ShouldWaitAndRetry(ctx, nil)
require.True(t, shouldRetry, "nil error should be retried")

shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unavailable, "first attempt"))
require.True(t, shouldRetry, "first attempt should be retried")

startTime := time.Now()
shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unavailable, "second attempt"))
require.True(t, shouldRetry, "second attempt should be retried")
elapsed := time.Since(startTime)
require.Greater(t, elapsed, 100*time.Millisecond, "second attempt should take longer than 100ms")
require.Less(t, elapsed, 300*time.Millisecond, "second attempt should take less than 300ms")

shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unavailable, "third attempt"))
require.True(t, shouldRetry, "third attempt should be retried")

shouldRetry = retryer.ShouldWaitAndRetry(ctx, status.Error(codes.Unavailable, "fourth attempt"))
require.False(t, shouldRetry, "fourth attempt should not be retried")
}
Loading
Loading