Skip to content

Commit 2ca3576

Browse files
authored
Merge pull request #316 from cschleiden/continue-as-new-expiration
Support custom expiration for continue-as-new workflows
2 parents 97a13f2 + 7d59d6d commit 2ca3576

File tree

3 files changed

+92
-3
lines changed

3 files changed

+92
-3
lines changed

backend/redis/expire_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,77 @@ func Test_AutoExpiration_SubWorkflow(t *testing.T) {
121121
_, err = b.GetWorkflowInstanceState(ctx, wfi)
122122
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
123123
}
124+
125+
func Test_AutoExpiration_ContinueAsNew_SubWorkflow(t *testing.T) {
126+
if testing.Short() {
127+
t.Skip()
128+
}
129+
130+
autoExpirationTime := time.Second * 2
131+
132+
redisClient := getClient()
133+
setup := getCreateBackend(redisClient, WithAutoExpiration(0), WithAutoExpirationContinueAsNew(autoExpirationTime))
134+
b := setup()
135+
136+
c := client.New(b)
137+
w := worker.New(b, nil)
138+
139+
ctx, cancel := context.WithCancel(context.Background())
140+
141+
require.NoError(t, w.Start(ctx))
142+
defer func() {
143+
cancel()
144+
145+
require.NoError(t, w.WaitForCompletion())
146+
}()
147+
148+
var swfInstances []*workflow.Instance
149+
150+
swf := func(ctx workflow.Context, iteration int) (int, error) {
151+
if iteration > 3 {
152+
return 42, nil
153+
}
154+
155+
// Keep track of continuedasnew instances
156+
swfInstances = append(swfInstances, workflow.WorkflowInstance(ctx))
157+
158+
return 0, workflow.ContinueAsNew(ctx, iteration+1)
159+
}
160+
161+
swfInstanceID := uuid.NewString()
162+
163+
wf := func(ctx workflow.Context) (int, error) {
164+
l := workflow.Logger(ctx)
165+
l.Debug("Starting sub workflow", "instanceID", swfInstanceID)
166+
167+
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
168+
InstanceID: swfInstanceID,
169+
}, swf, 0).Get(ctx)
170+
171+
workflow.ScheduleTimer(ctx, time.Second*2).Get(ctx)
172+
173+
return r, err
174+
}
175+
176+
w.RegisterWorkflow(wf)
177+
w.RegisterWorkflow(swf)
178+
179+
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
180+
InstanceID: uuid.NewString(),
181+
}, wf)
182+
require.NoError(t, err)
183+
184+
// Wait for redis to expire the keys
185+
time.Sleep(autoExpirationTime * 2)
186+
187+
// Main workflow should still be there
188+
r, err := client.GetWorkflowResult[int](ctx, c, wfi, time.Second*10)
189+
require.NoError(t, err)
190+
require.Equal(t, 42, r)
191+
192+
// All continued-as-new sub-workflow instances should be expired
193+
for _, swfInstance := range swfInstances {
194+
_, err = b.GetWorkflowInstanceState(ctx, swfInstance)
195+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
196+
}
197+
}

backend/redis/options.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ type RedisOptions struct {
1111

1212
BlockTimeout time.Duration
1313

14-
AutoExpiration time.Duration
14+
AutoExpiration time.Duration
15+
AutoExpirationContinueAsNew time.Duration
1516
}
1617

1718
type RedisBackendOption func(*RedisOptions)
@@ -37,3 +38,12 @@ func WithAutoExpiration(expireFinishedRunsAfter time.Duration) RedisBackendOptio
3738
o.AutoExpiration = expireFinishedRunsAfter
3839
}
3940
}
41+
42+
// WithAutoExpirationContinueAsNew sets the duration after which runs that were completed with `ContinueAsNew`
43+
// automatically expire.
44+
// If set to 0 (default), the overall expiration setting set with `WithAutoExpiration` will be used.
45+
func WithAutoExpirationContinueAsNew(expireContinuedAsNewRunsAfter time.Duration) RedisBackendOption {
46+
return func(o *RedisOptions) {
47+
o.AutoExpirationContinueAsNew = expireContinuedAsNewRunsAfter
48+
}
49+
}

backend/redis/workflow.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,13 @@ func (rb *redisBackend) CompleteWorkflowTask(
279279
span.End()
280280

281281
// Auto expiration
282-
if rb.options.AutoExpiration > 0 {
283-
if err := setWorkflowInstanceExpiration(ctx, rb.rdb, instance, rb.options.AutoExpiration); err != nil {
282+
expiration := rb.options.AutoExpiration
283+
if state == core.WorkflowInstanceStateContinuedAsNew && rb.options.AutoExpirationContinueAsNew > 0 {
284+
expiration = rb.options.AutoExpirationContinueAsNew
285+
}
286+
287+
if expiration > 0 {
288+
if err := setWorkflowInstanceExpiration(ctx, rb.rdb, instance, expiration); err != nil {
284289
return fmt.Errorf("setting workflow instance expiration: %w", err)
285290
}
286291
}

0 commit comments

Comments
 (0)