Skip to content

Commit bcd44f2

Browse files
authored
Merge pull request #211 from cschleiden/continue-as-new
Implement `ContinueAsNew`
2 parents 3d34b15 + 12a14f0 commit bcd44f2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1185
-498
lines changed

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,24 @@ func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
423423

424424
Similar to timer cancellation, you can pass a cancelable context to `CreateSubWorkflowInstance` and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the `Client`. See [Canceling workflows](#canceling-workflows) for more details.
425425

426+
### `ContinueAsNew`
427+
428+
```ContinueAsNew` allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special `error` from your workflow that contains the new inputs:
429+
430+
```go
431+
wf := func(ctx workflow.Context, run int) (int, error) {
432+
run = run + 1
433+
if run < 3 {
434+
return run, workflow.ContinueAsNew(ctx, run)
435+
}
436+
437+
return run, nil
438+
}
439+
```
440+
441+
Here the workflow is going to be restarted when `workflow.ContinueAsNew` is returned. Internally the new execution starts with a fresh history. It uses the same `InstanceID` but a different `ExecutionID`.
426442

443+
If a sub-workflow is restarted, the caller doesn't notice this, only once it ends without being restarted the caller will get the result and control will be passed back.
427444

428445
### `select`
429446

@@ -741,4 +758,4 @@ This kind of check is understandable for simple changes, but it becomes hard and
741758
742759
### `ContinueAsNew`
743760
744-
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. This feature is not implemented in go-workflows.
761+
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).

activitytester/activitytester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ func WithActivityTestState(ctx context.Context, activityID, instanceID string, l
1414
logger = dlogger.NewDefaultLogger()
1515
}
1616

17-
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID), logger))
17+
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID, ""), logger))
1818
}

backend/mysql/diagnostics.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
var _ diag.Backend = (*mysqlBackend)(nil)
1313

14-
func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID string, count int) ([]*diag.WorkflowInstanceRef, error) {
14+
func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
1515
var err error
1616
tx, err := mb.db.BeginTx(ctx, nil)
1717
if err != nil {
@@ -23,19 +23,20 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.instance_id, i.created_at, i.completed_at
26+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
2727
FROM instances i
28-
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ?) ii
28+
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
3030
ORDER BY i.created_at DESC, i.instance_id DESC
3131
LIMIT ?`,
3232
afterInstanceID,
33+
afterExecutionID,
3334
count,
3435
)
3536
} else {
3637
rows, err = tx.QueryContext(
3738
ctx,
38-
`SELECT i.instance_id, i.created_at, i.completed_at
39+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
3940
FROM instances i
4041
ORDER BY i.created_at DESC, i.instance_id DESC
4142
LIMIT ?`,
@@ -49,10 +50,10 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
4950
var instances []*diag.WorkflowInstanceRef
5051

5152
for rows.Next() {
52-
var id string
53+
var id, executionID string
5354
var createdAt time.Time
5455
var completedAt *time.Time
55-
err = rows.Scan(&id, &createdAt, &completedAt)
56+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
5657
if err != nil {
5758
return nil, err
5859
}
@@ -63,7 +64,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
6364
}
6465

6566
instances = append(instances, &diag.WorkflowInstanceRef{
66-
Instance: core.NewWorkflowInstance(id),
67+
Instance: core.NewWorkflowInstance(id, executionID),
6768
CreatedAt: createdAt,
6869
CompletedAt: completedAt,
6970
State: state,
@@ -73,20 +74,22 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
7374
return instances, nil
7475
}
7576

76-
func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID string) (*diag.WorkflowInstanceRef, error) {
77+
func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
7778
tx, err := mb.db.BeginTx(ctx, nil)
7879
if err != nil {
7980
return nil, err
8081
}
8182
defer tx.Rollback()
8283

83-
res := tx.QueryRowContext(ctx, "SELECT instance_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
84+
res := tx.QueryRowContext(
85+
ctx,
86+
"SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID)
8487

85-
var id string
88+
var id, executionID string
8689
var createdAt time.Time
8790
var completedAt *time.Time
8891

89-
err = res.Scan(&id, &createdAt, &completedAt)
92+
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
9093
if err != nil {
9194
if err == sql.ErrNoRows {
9295
return nil, nil
@@ -101,14 +104,14 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
101104
}
102105

103106
return &diag.WorkflowInstanceRef{
104-
Instance: core.NewWorkflowInstance(id),
107+
Instance: core.NewWorkflowInstance(id, executionID),
105108
CreatedAt: createdAt,
106109
CompletedAt: completedAt,
107110
State: state,
108111
}, nil
109112
}
110113

111-
func (mb *mysqlBackend) GetWorkflowTree(ctx context.Context, instanceID string) (*diag.WorkflowInstanceTree, error) {
114+
func (mb *mysqlBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
112115
itb := diag.NewInstanceTreeBuilder(mb)
113-
return itb.BuildWorkflowInstanceTree(ctx, instanceID)
116+
return itb.BuildWorkflowInstanceTree(ctx, instance)
114117
}

backend/mysql/events.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import (
55
"database/sql"
66
"strings"
77

8+
"github.com/cschleiden/go-workflows/internal/core"
89
"github.com/cschleiden/go-workflows/internal/history"
910
)
1011

11-
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []*history.Event) error {
12-
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
12+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, newEvents []*history.Event) error {
13+
return insertEvents(ctx, tx, "pending_events", instance, newEvents)
1314
}
1415

15-
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instanceID string, historyEvents []*history.Event) error {
16-
return insertEvents(ctx, tx, "history", instanceID, historyEvents)
16+
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, historyEvents []*history.Event) error {
17+
return insertEvents(ctx, tx, "history", instance, historyEvents)
1718
}
1819

19-
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID string, events []*history.Event) error {
20+
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *core.WorkflowInstance, events []*history.Event) error {
2021
const batchSize = 20
2122
for batchStart := 0; batchStart < len(events); batchStart += batchSize {
2223
batchEnd := batchStart + batchSize
@@ -25,8 +26,9 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
2526
}
2627
batchEvents := events[batchStart:batchEnd]
2728

28-
query := "INSERT INTO `" + tableName + "` (event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
29-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
29+
query := "INSERT INTO `" + tableName +
30+
"` (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
31+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
3032

3133
args := make([]interface{}, 0, len(batchEvents)*7)
3234

@@ -36,7 +38,9 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
3638
return err
3739
}
3840

39-
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
41+
args = append(
42+
args,
43+
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
4044
}
4145

4246
_, err := tx.ExecContext(
@@ -52,11 +56,12 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
5256
return nil
5357
}
5458

55-
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instanceID string, scheduleEventID int64) error {
59+
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error {
5660
_, err := tx.ExecContext(
5761
ctx,
58-
"DELETE FROM `pending_events` WHERE instance_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
59-
instanceID,
62+
"DELETE FROM `pending_events` WHERE instance_id = ? AND execution_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
63+
instance.InstanceID,
64+
instance.ExecutionID,
6065
scheduleEventID,
6166
)
6267

0 commit comments

Comments
 (0)