Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (p *ProgressCounts) LogExpandProgress(ctx context.Context, actions []*expan

// syncer orchestrates a connector sync and stores the results using the provided datasource.Writer.
type syncer struct {
attempts int
c1zManager manager.Manager
c1zPath string
externalResourceC1ZPath string
Expand Down Expand Up @@ -244,25 +245,23 @@ func (s *syncer) handleProgress(ctx context.Context, a *Action, c int) {
}
}

var attempts = 0

func shouldWaitAndRetry(ctx context.Context, err error) bool {
func (s *syncer) shouldWaitAndRetry(ctx context.Context, err error) bool {
ctx, span := tracer.Start(ctx, "syncer.shouldWaitAndRetry")
defer span.End()

if err == nil {
attempts = 0
s.attempts = 0
return true
}
if status.Code(err) != codes.Unavailable && status.Code(err) != codes.DeadlineExceeded {
return false
}

attempts++
s.attempts++
l := ctxzap.Extract(ctx)

// use linear time by default
var wait time.Duration = time.Duration(attempts) * time.Second
var wait time.Duration = time.Duration(s.attempts) * time.Second

// If error contains rate limit data, use that instead
if st, ok := status.FromError(err); ok {
Expand Down Expand Up @@ -420,14 +419,14 @@ func (s *syncer) Sync(ctx context.Context) error {

case SyncResourceTypesOp:
err = s.SyncResourceTypes(ctx)
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue

case SyncResourcesOp:
err = s.SyncResources(ctx)
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue
Expand All @@ -440,7 +439,7 @@ func (s *syncer) Sync(ctx context.Context) error {
s.state.FinishAction(ctx)
continue
}
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue
Expand All @@ -453,20 +452,20 @@ func (s *syncer) Sync(ctx context.Context) error {
s.state.FinishAction(ctx)
continue
}
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue

case SyncExternalResourcesOp:
err = s.SyncExternalResources(ctx)
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue
case SyncAssetsOp:
err = s.SyncAssets(ctx)
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue
Expand All @@ -479,7 +478,7 @@ func (s *syncer) Sync(ctx context.Context) error {
}

err = s.SyncGrantExpansion(ctx)
if !shouldWaitAndRetry(ctx, err) {
if !s.shouldWaitAndRetry(ctx, err) {
return err
}
continue
Expand Down
Loading