Skip to content

Commit 2cebb34

Browse files
committed
Allow removing continued as new instances immediately
1 parent acd9736 commit 2cebb34

File tree

8 files changed

+173
-7
lines changed

8 files changed

+173
-7
lines changed

backend/mysql/mysql.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
166166
}
167167
defer tx.Rollback()
168168

169+
if err := b.removeWorkflowInstance(ctx, instance, tx); err != nil {
170+
return err
171+
}
172+
173+
return tx.Commit()
174+
}
175+
176+
func (b *mysqlBackend) removeWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, tx *sql.Tx) error {
169177
row := tx.QueryRowContext(ctx, "SELECT state FROM `instances` WHERE instance_id = ? AND execution_id = ? LIMIT 1", instance.InstanceID, instance.ExecutionID)
170178
var state core.WorkflowInstanceState
171179
if err := row.Scan(&state); err != nil {
@@ -191,7 +199,7 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
191199
return err
192200
}
193201

194-
return tx.Commit()
202+
return nil
195203
}
196204

197205
func (b *mysqlBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
@@ -752,6 +760,12 @@ func (b *mysqlBackend) CompleteWorkflowTask(
752760
}
753761
}
754762

763+
if b.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
764+
if err := b.removeWorkflowInstance(ctx, instance, tx); err != nil {
765+
return fmt.Errorf("removing old instance: %w", err)
766+
}
767+
}
768+
755769
if err := tx.Commit(); err != nil {
756770
return fmt.Errorf("committing complete workflow transaction: %w", err)
757771
}

backend/options.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ type Options struct {
3737
// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
3838
// by that timeframe, it's considered abandoned and another worker might pick it up
3939
ActivityLockTimeout time.Duration
40+
41+
// RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be
42+
// removed immediately, including their history. If set to false, the instance will be removed after the configured
43+
// retention period or never.
44+
RemoveContinuedAsNewInstances bool
4045
}
4146

4247
var DefaultOptions Options = Options{
@@ -50,6 +55,8 @@ var DefaultOptions Options = Options{
5055
Converter: converter.DefaultConverter,
5156

5257
ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}},
58+
59+
RemoveContinuedAsNewInstances: false,
5360
}
5461

5562
type BackendOption func(*Options)
@@ -90,6 +97,12 @@ func WithContextPropagator(prop workflow.ContextPropagator) BackendOption {
9097
}
9198
}
9299

100+
func WithRemoveContinuedAsNewInstances() BackendOption {
101+
return func(o *Options) {
102+
o.RemoveContinuedAsNewInstances = true
103+
}
104+
}
105+
93106
func ApplyOptions(opts ...BackendOption) *Options {
94107
options := DefaultOptions
95108

backend/redis/instance.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *co
149149
return err
150150
}
151151

152-
// Check state
153-
if i.State != core.WorkflowInstanceStateFinished {
152+
if i.State != core.WorkflowInstanceStateFinished && i.State != core.WorkflowInstanceStateContinuedAsNew {
154153
return backend.ErrInstanceNotFinished
155154
}
156155

backend/redis/workflow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,12 @@ func (rb *redisBackend) CompleteWorkflowTask(
323323
return fmt.Errorf("setting workflow instance expiration: %w", err)
324324
}
325325
}
326+
327+
if rb.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
328+
if err := rb.RemoveWorkflowInstance(ctx, instance); err != nil {
329+
return fmt.Errorf("removing workflow instance: %w", err)
330+
}
331+
}
326332
}
327333

328334
return nil

backend/sqlite/sqlite.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,17 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
227227
}
228228
defer tx.Rollback()
229229

230+
if err := sb.removeWorkflowInstance(ctx, instance, tx); err != nil {
231+
return err
232+
}
233+
234+
return tx.Commit()
235+
}
236+
237+
func (sb *sqliteBackend) removeWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, tx *sql.Tx) error {
230238
instanceID := instance.InstanceID
231239
executionID := instance.ExecutionID
232240

233-
// Check status of the instance
234241
row := tx.QueryRowContext(ctx, "SELECT state FROM `instances` WHERE id = ? AND execution_id = ? LIMIT 1", instanceID, executionID)
235242
var state core.WorkflowInstanceState
236243
if err := row.Scan(&state); err != nil {
@@ -243,7 +250,6 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
243250
return backend.ErrInstanceNotFinished
244251
}
245252

246-
// Delete from instances and history tables
247253
if _, err := tx.ExecContext(ctx, "DELETE FROM `instances` WHERE id = ? AND execution_id = ?", instanceID, executionID); err != nil {
248254
return err
249255
}
@@ -256,7 +262,7 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
256262
return err
257263
}
258264

259-
return tx.Commit()
265+
return nil
260266
}
261267

262268
func (sb *sqliteBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
@@ -539,6 +545,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context, queues []workflow.
539545
return t, nil
540546
}
541547

548+
// MARK: CompleteWorkflowTask
542549
func (sb *sqliteBackend) CompleteWorkflowTask(
543550
ctx context.Context,
544551
task *backend.WorkflowTask,
@@ -670,6 +677,12 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
670677
}
671678
}
672679

680+
if sb.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
681+
if err := sb.removeWorkflowInstance(ctx, instance, tx); err != nil {
682+
return fmt.Errorf("removing old instance: %w", err)
683+
}
684+
}
685+
673686
return tx.Commit()
674687
}
675688

backend/test/e2e.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
621621
tests = append(tests, e2eDiagTests...)
622622
tests = append(tests, e2eQueueTests...)
623623
tests = append(tests, e2eRemovalTests...)
624+
tests = append(tests, e2eContinueAsNewTests...)
624625

625626
run := func(suffix string, workerOptions worker.Options) {
626627
for _, tt := range tests {

backend/test/e2e_continueasnew.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/core"
11+
"github.com/cschleiden/go-workflows/worker"
12+
"github.com/cschleiden/go-workflows/workflow"
13+
"github.com/google/uuid"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
var e2eContinueAsNewTests = []backendTest{
18+
{
19+
name: "ContinueAsNew/RestartsWorkflowInstance",
20+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
21+
wf := func(ctx workflow.Context, run int) (int, error) {
22+
run = run + 1
23+
if run < 3 {
24+
return run, workflow.ContinueAsNew(ctx, run)
25+
}
26+
27+
return run, nil
28+
}
29+
register(t, ctx, w, []interface{}{wf}, nil)
30+
31+
instance := runWorkflow(t, ctx, c, wf, 0)
32+
33+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*10)
34+
require.NoError(t, err)
35+
require.Equal(t, 1, r)
36+
37+
state, err := b.GetWorkflowInstanceState(ctx, instance)
38+
require.NoError(t, err)
39+
require.Equal(t, core.WorkflowInstanceStateContinuedAsNew, state)
40+
},
41+
},
42+
{
43+
name: "ContinueAsNew/Subworkflow",
44+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
45+
swf := func(ctx workflow.Context, run int) (int, error) {
46+
l := workflow.Logger(ctx)
47+
48+
run = run + 1
49+
if run < 3 {
50+
l.Debug("continue as new", "run", run)
51+
return run, workflow.ContinueAsNew(ctx, run)
52+
}
53+
54+
return run, nil
55+
}
56+
57+
wf := func(ctx workflow.Context, run int) (int, error) {
58+
return workflow.CreateSubWorkflowInstance[int](ctx, workflow.DefaultSubWorkflowOptions, swf, run).Get(ctx)
59+
}
60+
register(t, ctx, w, []interface{}{wf, swf}, nil)
61+
62+
instance := runWorkflow(t, ctx, c, wf, 0)
63+
64+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
65+
require.NoError(t, err)
66+
require.Equal(t, 3, r)
67+
},
68+
},
69+
{
70+
name: "ContinueAsNew/OldInstancesAreRemoved",
71+
options: []backend.BackendOption{backend.WithRemoveContinuedAsNewInstances()},
72+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
73+
var swfInstances []*workflow.Instance
74+
75+
swf := func(ctx workflow.Context, iteration int) (int, error) {
76+
if iteration > 3 {
77+
return 42, nil
78+
}
79+
80+
// Keep track of continuedasnew instances
81+
swfInstances = append(swfInstances, workflow.WorkflowInstance(ctx))
82+
83+
return 0, workflow.ContinueAsNew(ctx, iteration+1)
84+
}
85+
86+
swfInstanceID := uuid.NewString()
87+
88+
wf := func(ctx workflow.Context) (int, error) {
89+
l := workflow.Logger(ctx)
90+
l.Debug("Starting sub workflow", "instanceID", swfInstanceID)
91+
92+
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
93+
InstanceID: swfInstanceID,
94+
}, swf, 0).Get(ctx)
95+
96+
workflow.ScheduleTimer(ctx, time.Second*2).Get(ctx)
97+
98+
return r, err
99+
}
100+
101+
register(t, ctx, w, []interface{}{wf, swf}, nil)
102+
103+
wfi := runWorkflow(t, ctx, c, wf)
104+
105+
// // Wait for redis to expire the keys
106+
// time.Sleep(autoExpirationTime * 2)
107+
108+
// Main workflow should still be there
109+
r, err := client.GetWorkflowResult[int](ctx, c, wfi, time.Second*10)
110+
require.NoError(t, err)
111+
require.Equal(t, 42, r)
112+
113+
// All continued-as-new sub-workflow instances should be expired
114+
for _, swfInstance := range swfInstances {
115+
_, err = b.GetWorkflowInstanceState(ctx, swfInstance)
116+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
117+
}
118+
},
119+
},
120+
}

internal/command/continueasnew.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (c *ContinueAsNewCommand) Execute(clock clock.Clock) *CommandResult {
4444
var continuedInstance *core.WorkflowInstance
4545
if c.Instance.SubWorkflow() {
4646
// If the current workflow execution was a sub-workflow, ensure the new workflow execution is also a sub-workflow.
47-
// This will guarantee that finished event for the new execution will be delivered to the right parent instance
47+
// This will guarantee that the finished event for the new execution will be delivered to the right parent instance
4848
continuedInstance = core.NewSubWorkflowInstance(
4949
c.Instance.InstanceID, continuedExecutionID, c.Instance.Parent, c.Instance.ParentEventID)
5050
} else {

0 commit comments

Comments
 (0)