Skip to content

Commit 8dee177

Browse files
authored
Merge pull request #364 from cschleiden/cleanup-continue-as-new
Support removing ContinuedAsNew instances immediately
2 parents 23a3052 + 2cebb34 commit 8dee177

File tree

13 files changed

+342
-226
lines changed

13 files changed

+342
-226
lines changed

backend/mysql/mysql.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
166166
}
167167
defer tx.Rollback()
168168

169+
if err := b.removeWorkflowInstance(ctx, instance, tx); err != nil {
170+
return err
171+
}
172+
173+
return tx.Commit()
174+
}
175+
176+
func (b *mysqlBackend) removeWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, tx *sql.Tx) error {
169177
row := tx.QueryRowContext(ctx, "SELECT state FROM `instances` WHERE instance_id = ? AND execution_id = ? LIMIT 1", instance.InstanceID, instance.ExecutionID)
170178
var state core.WorkflowInstanceState
171179
if err := row.Scan(&state); err != nil {
@@ -191,7 +199,7 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
191199
return err
192200
}
193201

194-
return tx.Commit()
202+
return nil
195203
}
196204

197205
func (b *mysqlBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
@@ -752,6 +760,12 @@ func (b *mysqlBackend) CompleteWorkflowTask(
752760
}
753761
}
754762

763+
if b.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
764+
if err := b.removeWorkflowInstance(ctx, instance, tx); err != nil {
765+
return fmt.Errorf("removing old instance: %w", err)
766+
}
767+
}
768+
755769
if err := tx.Commit(); err != nil {
756770
return fmt.Errorf("committing complete workflow transaction: %w", err)
757771
}

backend/options.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ type Options struct {
3737
// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
3838
// by that timeframe, it's considered abandoned and another worker might pick it up
3939
ActivityLockTimeout time.Duration
40+
41+
// RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be
42+
// removed immediately, including their history. If set to false, the instance will be removed after the configured
43+
// retention period or never.
44+
RemoveContinuedAsNewInstances bool
4045
}
4146

4247
var DefaultOptions Options = Options{
@@ -50,6 +55,8 @@ var DefaultOptions Options = Options{
5055
Converter: converter.DefaultConverter,
5156

5257
ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}},
58+
59+
RemoveContinuedAsNewInstances: false,
5360
}
5461

5562
type BackendOption func(*Options)
@@ -90,6 +97,12 @@ func WithContextPropagator(prop workflow.ContextPropagator) BackendOption {
9097
}
9198
}
9299

100+
func WithRemoveContinuedAsNewInstances() BackendOption {
101+
return func(o *Options) {
102+
o.RemoveContinuedAsNewInstances = true
103+
}
104+
}
105+
93106
func ApplyOptions(opts ...BackendOption) *Options {
94107
options := DefaultOptions
95108

backend/redis/instance.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *co
149149
return err
150150
}
151151

152-
// Check state
153-
if i.State != core.WorkflowInstanceStateFinished {
152+
if i.State != core.WorkflowInstanceStateFinished && i.State != core.WorkflowInstanceStateContinuedAsNew {
154153
return backend.ErrInstanceNotFinished
155154
}
156155

backend/redis/workflow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,12 @@ func (rb *redisBackend) CompleteWorkflowTask(
323323
return fmt.Errorf("setting workflow instance expiration: %w", err)
324324
}
325325
}
326+
327+
if rb.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
328+
if err := rb.RemoveWorkflowInstance(ctx, instance); err != nil {
329+
return fmt.Errorf("removing workflow instance: %w", err)
330+
}
331+
}
326332
}
327333

328334
return nil

backend/sqlite/sqlite.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,17 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
227227
}
228228
defer tx.Rollback()
229229

230+
if err := sb.removeWorkflowInstance(ctx, instance, tx); err != nil {
231+
return err
232+
}
233+
234+
return tx.Commit()
235+
}
236+
237+
func (sb *sqliteBackend) removeWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, tx *sql.Tx) error {
230238
instanceID := instance.InstanceID
231239
executionID := instance.ExecutionID
232240

233-
// Check status of the instance
234241
row := tx.QueryRowContext(ctx, "SELECT state FROM `instances` WHERE id = ? AND execution_id = ? LIMIT 1", instanceID, executionID)
235242
var state core.WorkflowInstanceState
236243
if err := row.Scan(&state); err != nil {
@@ -243,7 +250,6 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
243250
return backend.ErrInstanceNotFinished
244251
}
245252

246-
// Delete from instances and history tables
247253
if _, err := tx.ExecContext(ctx, "DELETE FROM `instances` WHERE id = ? AND execution_id = ?", instanceID, executionID); err != nil {
248254
return err
249255
}
@@ -256,7 +262,7 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
256262
return err
257263
}
258264

259-
return tx.Commit()
265+
return nil
260266
}
261267

262268
func (sb *sqliteBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
@@ -539,6 +545,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context, queues []workflow.
539545
return t, nil
540546
}
541547

548+
// MARK: CompleteWorkflowTask
542549
func (sb *sqliteBackend) CompleteWorkflowTask(
543550
ctx context.Context,
544551
task *backend.WorkflowTask,
@@ -670,6 +677,12 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
670677
}
671678
}
672679

680+
if sb.options.RemoveContinuedAsNewInstances && state == core.WorkflowInstanceStateContinuedAsNew {
681+
if err := sb.removeWorkflowInstance(ctx, instance, tx); err != nil {
682+
return fmt.Errorf("removing old instance: %w", err)
683+
}
684+
}
685+
673686
return tx.Commit()
674687
}
675688

0 commit comments

Comments
 (0)