Skip to content

Commit d6b61ee

Browse files
authored
Retry cancellation tests (#442)
1 parent 5003264 commit d6b61ee

File tree

3 files changed

+313
-0
lines changed

3 files changed

+313
-0
lines changed

tester/tester_retries_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"testing"
7+
"time"
78

89
"github.com/cschleiden/go-workflows/internal/sync"
910
"github.com/cschleiden/go-workflows/internal/workflowerrors"
@@ -45,3 +46,147 @@ func Test_withRetries_permanent(t *testing.T) {
4546

4647
require.Equal(t, 1, attempts)
4748
}
49+
50+
// Test_withRetries_contextCanceled tests that WithRetries returns Canceled
51+
// when the context is already canceled before the function is called
52+
func Test_withRetries_contextCanceled(t *testing.T) {
53+
wf := func(ctx workflow.Context) error {
54+
tctx, cancel := workflow.WithCancel(ctx)
55+
cancel()
56+
57+
f := workflow.WithRetries(tctx, workflow.DefaultRetryOptions, func(ctx workflow.Context, attempt int) workflow.Future[int] {
58+
// This should never be called since the context is already canceled
59+
panic("function should not be called when context is already canceled")
60+
})
61+
62+
_, err := f.Get(ctx)
63+
return err
64+
}
65+
66+
tester := NewWorkflowTester[any](wf)
67+
68+
tester.Execute(context.Background())
69+
require.True(t, tester.WorkflowFinished())
70+
71+
_, err := tester.WorkflowResult()
72+
require.EqualError(t, err, "context canceled")
73+
tester.AssertExpectations(t)
74+
}
75+
76+
// Test_withRetries_contextCanceledDuringRetry tests that WithRetries returns Canceled
77+
// when the context is canceled during retry backoff
78+
func Test_withRetries_contextCanceledDuringRetry(t *testing.T) {
79+
wf := func(ctx workflow.Context) (int, error) {
80+
tctx, cancel := workflow.WithCancel(ctx)
81+
82+
attemptCount := 0
83+
f := workflow.WithRetries(tctx, workflow.RetryOptions{
84+
MaxAttempts: 5,
85+
FirstRetryInterval: 1 * time.Second,
86+
BackoffCoefficient: 1,
87+
}, func(ctx workflow.Context, attempt int) workflow.Future[int] {
88+
f := sync.NewFuture[int]()
89+
attemptCount++
90+
91+
// Fail the first two attempts, then cancel
92+
if attempt == 1 {
93+
// Cancel after first retry
94+
cancel()
95+
}
96+
f.Set(attempt, errors.New("retry error"))
97+
98+
return f
99+
})
100+
101+
// Wait for retries to finish
102+
attempts, err := f.Get(ctx)
103+
104+
return attempts, err
105+
}
106+
107+
tester := NewWorkflowTester[int](wf)
108+
109+
tester.Execute(context.Background())
110+
require.True(t, tester.WorkflowFinished())
111+
112+
_, err := tester.WorkflowResult()
113+
require.EqualError(t, err, "context canceled")
114+
tester.AssertExpectations(t)
115+
}
116+
117+
// Test_withRetries_contextCanceledDuringExecution tests that WithRetries returns Canceled
118+
// when the context is canceled while the function is executing
119+
func Test_withRetries_contextCanceledDuringExecution(t *testing.T) {
120+
wf := func(ctx workflow.Context) error {
121+
tctx, cancel := workflow.WithCancel(ctx)
122+
123+
f := workflow.WithRetries(tctx, workflow.RetryOptions{
124+
MaxAttempts: 3,
125+
FirstRetryInterval: 1 * time.Second,
126+
BackoffCoefficient: 1,
127+
}, func(ctx workflow.Context, attempt int) workflow.Future[int] {
128+
f := sync.NewFuture[int]()
129+
130+
// Cancel after first attempt starts
131+
if attempt == 0 {
132+
cancel()
133+
}
134+
135+
// Simulate async work with an error
136+
f.Set(attempt, errors.New("operation failed"))
137+
138+
return f
139+
})
140+
141+
_, err := f.Get(ctx)
142+
return err
143+
}
144+
145+
tester := NewWorkflowTester[any](wf)
146+
147+
tester.Execute(context.Background())
148+
require.True(t, tester.WorkflowFinished())
149+
150+
_, err := tester.WorkflowResult()
151+
require.EqualError(t, err, "context canceled")
152+
tester.AssertExpectations(t)
153+
}
154+
155+
// Test_withRetries_timerCancellation tests that the retry backoff timer is canceled
156+
// when the context is canceled during the wait
157+
func Test_withRetries_timerCancellation(t *testing.T) {
158+
wf := func(ctx workflow.Context) error {
159+
tctx, cancel := workflow.WithCancel(ctx)
160+
161+
f := workflow.WithRetries(tctx, workflow.RetryOptions{
162+
MaxAttempts: 5,
163+
FirstRetryInterval: 10 * time.Second,
164+
BackoffCoefficient: 1,
165+
}, func(ctx workflow.Context, attempt int) workflow.Future[int] {
166+
f := sync.NewFuture[int]()
167+
168+
// Always fail to trigger retries
169+
f.Set(attempt, errors.New("operation failed"))
170+
171+
return f
172+
})
173+
174+
// Start a timer to cancel the context after a short delay
175+
workflow.Go(ctx, func(ctx workflow.Context) {
176+
workflow.Sleep(ctx, 2*time.Second)
177+
cancel()
178+
})
179+
180+
_, err := f.Get(ctx)
181+
return err
182+
}
183+
184+
tester := NewWorkflowTester[any](wf)
185+
186+
tester.Execute(context.Background())
187+
require.True(t, tester.WorkflowFinished())
188+
189+
_, err := tester.WorkflowResult()
190+
require.EqualError(t, err, "context canceled")
191+
tester.AssertExpectations(t)
192+
}

workflow/retries.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ var DefaultRetryOptions = RetryOptions{
3232

3333
// WithRetries executes the given function with retries.
3434
func WithRetries[T any](ctx Context, retryOptions RetryOptions, fn func(ctx Context, attempt int) Future[T]) Future[T] {
35+
// If the context is already canceled, return immediately.
36+
if ctx.Err() != nil {
37+
f := sync.NewFuture[T]()
38+
f.Set(*new(T), ctx.Err())
39+
return f
40+
}
41+
3542
attempt := 0
3643
firstAttempt := Now(ctx)
3744

workflow/retries_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package workflow
2+
3+
import (
4+
"errors"
5+
"log/slog"
6+
"testing"
7+
"time"
8+
9+
"github.com/benbjohnson/clock"
10+
"github.com/cschleiden/go-workflows/backend/converter"
11+
"github.com/cschleiden/go-workflows/core"
12+
"github.com/cschleiden/go-workflows/internal/contextvalue"
13+
"github.com/cschleiden/go-workflows/internal/sync"
14+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
15+
"github.com/cschleiden/go-workflows/internal/workflowstate"
16+
"github.com/stretchr/testify/require"
17+
"go.opentelemetry.io/otel/trace/noop"
18+
)
19+
20+
func TestWithRetries(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
setupCtx func(Context) (Context, func()) // returns context and cleanup function
24+
retryOptions RetryOptions
25+
fn func(t *testing.T, attemptCount *int) func(Context, int) Future[string]
26+
expectedErr error
27+
expectedResult string
28+
expectedCalls int
29+
}{
30+
{
31+
name: "context canceled before execution",
32+
setupCtx: func(ctx Context) (Context, func()) {
33+
ctx, cancel := WithCancel(ctx)
34+
cancel() // Cancel immediately
35+
return ctx, func() {}
36+
},
37+
retryOptions: DefaultRetryOptions,
38+
fn: func(t *testing.T, attemptCount *int) func(Context, int) Future[string] {
39+
return func(ctx Context, attempt int) Future[string] {
40+
require.FailNow(t, "function should not be called when context is already canceled")
41+
return sync.NewFuture[string]()
42+
}
43+
},
44+
expectedErr: Canceled,
45+
expectedCalls: 0,
46+
},
47+
{
48+
name: "no retry needed - success",
49+
setupCtx: func(ctx Context) (Context, func()) {
50+
return ctx, func() {}
51+
},
52+
retryOptions: DefaultRetryOptions,
53+
fn: func(t *testing.T, attemptCount *int) func(Context, int) Future[string] {
54+
return func(ctx Context, attempt int) Future[string] {
55+
f := sync.NewFuture[string]()
56+
*attemptCount++
57+
f.Set("success", nil)
58+
return f
59+
}
60+
},
61+
expectedResult: "success",
62+
expectedCalls: 1,
63+
},
64+
{
65+
name: "max attempts one - no retries",
66+
setupCtx: func(ctx Context) (Context, func()) {
67+
return ctx, func() {}
68+
},
69+
retryOptions: RetryOptions{
70+
MaxAttempts: 1,
71+
FirstRetryInterval: 1 * time.Second,
72+
BackoffCoefficient: 1,
73+
},
74+
fn: func(t *testing.T, attemptCount *int) func(Context, int) Future[string] {
75+
return func(ctx Context, attempt int) Future[string] {
76+
f := sync.NewFuture[string]()
77+
*attemptCount++
78+
f.Set("", errors.New("error"))
79+
return f
80+
}
81+
},
82+
expectedErr: errors.New("error"), // Only compare error message in assertion
83+
expectedCalls: 1,
84+
},
85+
}
86+
87+
for _, tt := range tests {
88+
t.Run(tt.name, func(t *testing.T) {
89+
state := workflowstate.NewWorkflowState(
90+
core.NewWorkflowInstance("a", ""), slog.Default(), noop.NewTracerProvider().Tracer("test"), clock.New())
91+
92+
ctx := sync.Background()
93+
ctx = contextvalue.WithConverter(ctx, converter.DefaultConverter)
94+
ctx = workflowstate.WithWorkflowState(ctx, state)
95+
96+
ctx, cleanup := tt.setupCtx(ctx)
97+
defer cleanup()
98+
99+
attemptCount := 0
100+
c := sync.NewCoroutine(ctx, func(ctx Context) error {
101+
f := WithRetries(ctx, tt.retryOptions, tt.fn(t, &attemptCount))
102+
103+
result, err := f.Get(ctx)
104+
105+
if tt.expectedErr != nil {
106+
if tt.expectedErr == Canceled {
107+
require.Equal(t, Canceled, err)
108+
} else {
109+
require.EqualError(t, err, tt.expectedErr.Error())
110+
}
111+
} else {
112+
require.NoError(t, err)
113+
require.Equal(t, tt.expectedResult, result)
114+
}
115+
116+
return nil
117+
})
118+
119+
c.Execute()
120+
require.True(t, c.Finished())
121+
require.Equal(t, tt.expectedCalls, attemptCount)
122+
})
123+
}
124+
}
125+
126+
// Test_WithRetries_PermanentError is kept separate as it has different async behavior
127+
func Test_WithRetries_PermanentError(t *testing.T) {
128+
state := workflowstate.NewWorkflowState(
129+
core.NewWorkflowInstance("a", ""), slog.Default(), noop.NewTracerProvider().Tracer("test"), clock.New())
130+
131+
ctx := sync.Background()
132+
ctx = contextvalue.WithConverter(ctx, converter.DefaultConverter)
133+
ctx = workflowstate.WithWorkflowState(ctx, state)
134+
135+
attemptCount := 0
136+
c := sync.NewCoroutine(ctx, func(ctx Context) error {
137+
_ = WithRetries(ctx, DefaultRetryOptions, func(ctx Context, attempt int) Future[int] {
138+
f := sync.NewFuture[int]()
139+
attemptCount++
140+
141+
if attempt > 0 {
142+
// Return permanent error on retry
143+
f.Set(attempt, workflowerrors.NewPermanentError(errors.New("permanent error")))
144+
} else {
145+
// Return regular error on first attempt
146+
f.Set(attempt, errors.New("generic error"))
147+
}
148+
149+
return f
150+
})
151+
152+
// The future won't be ready immediately since WithRetries spawns a goroutine
153+
return nil
154+
})
155+
156+
c.Execute()
157+
158+
// Workflow finishes immediately as it just calls WithRetries, which starts an async goroutine
159+
require.True(t, c.Finished())
160+
require.Equal(t, 1, attemptCount) // Only first attempt happens synchronously
161+
}

0 commit comments

Comments
 (0)