Skip to content

Commit f629580

Browse files
committed
Support ContinueAsNew
1 parent e75fb6a commit f629580

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1012
-432
lines changed

backend/mysql/diagnostics.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
var _ diag.Backend = (*mysqlBackend)(nil)
1313

14-
func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID string, count int) ([]*diag.WorkflowInstanceRef, error) {
14+
func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
1515
var err error
1616
tx, err := mb.db.BeginTx(ctx, nil)
1717
if err != nil {
@@ -25,11 +25,12 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2525
ctx,
2626
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
2727
FROM instances i
28-
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ?) ii
28+
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
3030
ORDER BY i.created_at DESC, i.instance_id DESC
3131
LIMIT ?`,
3232
afterInstanceID,
33+
afterExecutionID,
3334
count,
3435
)
3536
} else {
@@ -73,14 +74,16 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
7374
return instances, nil
7475
}
7576

76-
func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID string) (*diag.WorkflowInstanceRef, error) {
77+
func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
7778
tx, err := mb.db.BeginTx(ctx, nil)
7879
if err != nil {
7980
return nil, err
8081
}
8182
defer tx.Rollback()
8283

83-
res := tx.QueryRowContext(ctx, "SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
84+
res := tx.QueryRowContext(
85+
ctx,
86+
"SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID)
8487

8588
var id, executionID string
8689
var createdAt time.Time
@@ -108,7 +111,7 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
108111
}, nil
109112
}
110113

111-
func (mb *mysqlBackend) GetWorkflowTree(ctx context.Context, instanceID string) (*diag.WorkflowInstanceTree, error) {
114+
func (mb *mysqlBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
112115
itb := diag.NewInstanceTreeBuilder(mb)
113-
return itb.BuildWorkflowInstanceTree(ctx, instanceID)
116+
return itb.BuildWorkflowInstanceTree(ctx, instance)
114117
}

backend/mysql/events.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import (
55
"database/sql"
66
"strings"
77

8+
"github.com/cschleiden/go-workflows/internal/core"
89
"github.com/cschleiden/go-workflows/internal/history"
910
)
1011

11-
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []*history.Event) error {
12-
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
12+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, newEvents []*history.Event) error {
13+
return insertEvents(ctx, tx, "pending_events", instance, newEvents)
1314
}
1415

15-
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instanceID string, historyEvents []*history.Event) error {
16-
return insertEvents(ctx, tx, "history", instanceID, historyEvents)
16+
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, historyEvents []*history.Event) error {
17+
return insertEvents(ctx, tx, "history", instance, historyEvents)
1718
}
1819

19-
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID string, events []*history.Event) error {
20+
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *core.WorkflowInstance, events []*history.Event) error {
2021
const batchSize = 20
2122
for batchStart := 0; batchStart < len(events); batchStart += batchSize {
2223
batchEnd := batchStart + batchSize
@@ -25,8 +26,9 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
2526
}
2627
batchEvents := events[batchStart:batchEnd]
2728

28-
query := "INSERT INTO `" + tableName + "` (event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
29-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
29+
query := "INSERT INTO `" + tableName +
30+
"` (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
31+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
3032

3133
args := make([]interface{}, 0, len(batchEvents)*7)
3234

@@ -36,7 +38,9 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
3638
return err
3739
}
3840

39-
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
41+
args = append(
42+
args,
43+
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
4044
}
4145

4246
_, err := tx.ExecContext(
@@ -52,11 +56,12 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
5256
return nil
5357
}
5458

55-
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instanceID string, scheduleEventID int64) error {
59+
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error {
5660
_, err := tx.ExecContext(
5761
ctx,
58-
"DELETE FROM `pending_events` WHERE instance_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
59-
instanceID,
62+
"DELETE FROM `pending_events` WHERE instance_id = ? AND execution_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
63+
instance.InstanceID,
64+
instance.ExecutionID,
6065
scheduleEventID,
6166
)
6267

0 commit comments

Comments
 (0)