Skip to content

Commit 159fc13

Browse files
authored
Merge pull request #120 from cschleiden/update-sub-workflow-retries
Adjust sub-workflow retry behavior
2 parents e8946b6 + 435c7fb commit 159fc13

File tree

3 files changed

+43
-26
lines changed

3 files changed

+43
-26
lines changed

internal/workflow/executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,8 @@ func Test_ExecuteWorkflowWithSelector(t *testing.T) {
317317
require.Equal(t, 1, workflowWithSelectorHits)
318318
require.Len(t, e.workflowState.Commands(), 2)
319319

320-
require.IsType(t, &command.ScheduleTimerCommand{}, e.workflowState.Commands()[0])
321-
require.IsType(t, &command.ScheduleActivityCommand{}, e.workflowState.Commands()[1])
320+
require.IsType(t, &command.ScheduleActivityCommand{}, e.workflowState.Commands()[0])
321+
require.IsType(t, &command.ScheduleTimerCommand{}, e.workflowState.Commands()[1])
322322
}
323323

324324
func Test_ExecuteNewEvents(t *testing.T) {

workflow/retries.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,20 @@ var DefaultRetryOptions = RetryOptions{
3030
}
3131

3232
func withRetries[T any](ctx sync.Context, retryOptions RetryOptions, fn func(ctx sync.Context, attempt int) Future[T]) Future[T] {
33+
attempt := 0
34+
firstAttempt := Now(ctx)
35+
36+
f := fn(ctx, attempt)
37+
3338
if retryOptions.MaxAttempts <= 1 {
3439
// Short-circuit if we don't need to retry
35-
return fn(ctx, 0)
40+
return f
3641
}
3742

43+
// Start a separate co-routine for retries
3844
r := sync.NewFuture[T]()
3945

4046
sync.Go(ctx, func(ctx sync.Context) {
41-
firstAttempt := Now(ctx)
42-
4347
var result T
4448
var err error
4549

@@ -48,32 +52,38 @@ func withRetries[T any](ctx sync.Context, retryOptions RetryOptions, fn func(ctx
4852
retryExpiration = firstAttempt.Add(retryOptions.RetryTimeout)
4953
}
5054

51-
for attempt := 0; attempt < retryOptions.MaxAttempts; attempt++ {
52-
if !retryExpiration.IsZero() && Now(ctx).After(retryExpiration) {
53-
// Reached maximum retry time, abort retries
55+
for {
56+
// Wait for active operation to finish
57+
result, err = f.Get(ctx)
58+
if err == nil {
5459
break
5560
}
5661

57-
result, err = fn(ctx, attempt).Get(ctx)
58-
if err != nil {
59-
if err == sync.Canceled {
60-
break
61-
}
62+
if err == sync.Canceled {
63+
break
64+
}
6265

63-
backoffDuration := time.Duration(float64(retryOptions.FirstRetryInterval) * math.Pow(retryOptions.BackoffCoefficient, float64(attempt)))
64-
if retryOptions.MaxRetryInterval > 0 {
65-
backoffDuration = time.Duration(math.Min(float64(backoffDuration), float64(retryOptions.MaxRetryInterval)))
66-
}
66+
backoffDuration := time.Duration(float64(retryOptions.FirstRetryInterval) * math.Pow(retryOptions.BackoffCoefficient, float64(attempt)))
67+
if retryOptions.MaxRetryInterval > 0 {
68+
backoffDuration = time.Duration(math.Min(float64(backoffDuration), float64(retryOptions.MaxRetryInterval)))
69+
}
6770

68-
if err := Sleep(ctx, backoffDuration); err != nil {
69-
r.Set(*new(T), err)
70-
return
71-
}
71+
if err := Sleep(ctx, backoffDuration); err != nil {
72+
r.Set(*new(T), err)
73+
return
74+
}
7275

73-
continue
76+
if !retryExpiration.IsZero() && Now(ctx).After(retryExpiration) {
77+
// Reached maximum retry time, abort retries
78+
break
79+
}
80+
81+
attempt++
82+
if attempt >= retryOptions.MaxAttempts {
83+
break
7484
}
7585

76-
break
86+
f = fn(ctx, attempt+1)
7787
}
7888

7989
r.Set(result, err)

workflow/subworkflow.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,16 @@ type SubWorkflowOptions struct {
2222
RetryOptions RetryOptions
2323
}
2424

25-
var DefaultSubWorkflowOptions = SubWorkflowOptions{
26-
RetryOptions: DefaultRetryOptions,
27-
}
25+
var (
26+
DefaultSubWorkflowRetryOptions = RetryOptions{
27+
// Disable retries by default for sub-workflows
28+
MaxAttempts: 1,
29+
}
30+
31+
DefaultSubWorkflowOptions = SubWorkflowOptions{
32+
RetryOptions: DefaultSubWorkflowRetryOptions,
33+
}
34+
)
2835

2936
func CreateSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, workflow interface{}, args ...interface{}) Future[TResult] {
3037
return withRetries(ctx, options.RetryOptions, func(ctx sync.Context, attempt int) Future[TResult] {

0 commit comments

Comments
 (0)