Skip to content

Commit e75fb6a

Browse files
committed
Add ExecutionID back
This reverts commit d29eb06.
1 parent 3d34b15 commit e75fb6a

33 files changed

+149
-119
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,4 +741,4 @@ This kind of check is understandable for simple changes, but it becomes hard and
741741
742742
### `ContinueAsNew`
743743
744-
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. This feature is not implemented in go-workflows.
744+
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).

activitytester/activitytester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ func WithActivityTestState(ctx context.Context, activityID, instanceID string, l
1414
logger = dlogger.NewDefaultLogger()
1515
}
1616

17-
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID), logger))
17+
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID, ""), logger))
1818
}

backend/mysql/diagnostics.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.instance_id, i.created_at, i.completed_at
26+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
2727
FROM instances i
2828
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
@@ -35,7 +35,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
3535
} else {
3636
rows, err = tx.QueryContext(
3737
ctx,
38-
`SELECT i.instance_id, i.created_at, i.completed_at
38+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
3939
FROM instances i
4040
ORDER BY i.created_at DESC, i.instance_id DESC
4141
LIMIT ?`,
@@ -49,10 +49,10 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
4949
var instances []*diag.WorkflowInstanceRef
5050

5151
for rows.Next() {
52-
var id string
52+
var id, executionID string
5353
var createdAt time.Time
5454
var completedAt *time.Time
55-
err = rows.Scan(&id, &createdAt, &completedAt)
55+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
5656
if err != nil {
5757
return nil, err
5858
}
@@ -63,7 +63,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
6363
}
6464

6565
instances = append(instances, &diag.WorkflowInstanceRef{
66-
Instance: core.NewWorkflowInstance(id),
66+
Instance: core.NewWorkflowInstance(id, executionID),
6767
CreatedAt: createdAt,
6868
CompletedAt: completedAt,
6969
State: state,
@@ -80,13 +80,13 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
8080
}
8181
defer tx.Rollback()
8282

83-
res := tx.QueryRowContext(ctx, "SELECT instance_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
83+
res := tx.QueryRowContext(ctx, "SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
8484

85-
var id string
85+
var id, executionID string
8686
var createdAt time.Time
8787
var completedAt *time.Time
8888

89-
err = res.Scan(&id, &createdAt, &completedAt)
89+
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
9090
if err != nil {
9191
if err == sql.ErrNoRows {
9292
return nil, nil
@@ -101,7 +101,7 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
101101
}
102102

103103
return &diag.WorkflowInstanceRef{
104-
Instance: core.NewWorkflowInstance(id),
104+
Instance: core.NewWorkflowInstance(id, executionID),
105105
CreatedAt: createdAt,
106106
CompletedAt: completedAt,
107107
State: state,

backend/mysql/mysql.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
234234
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) {
235235
row := b.db.QueryRowContext(
236236
ctx,
237-
"SELECT completed_at FROM instances WHERE instance_id = ?",
237+
"SELECT completed_at FROM instances WHERE instance_id = ? AND execution_id = ?",
238238
instance.InstanceID,
239+
instance.ExecutionID,
239240
)
240241

241242
var completedAt sql.NullTime
@@ -270,8 +271,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
270271

271272
res, err := tx.ExecContext(
272273
ctx,
273-
"INSERT IGNORE INTO `instances` (instance_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?)",
274+
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?, ?)",
274275
wfi.InstanceID,
276+
wfi.ExecutionID,
275277
parentInstanceID,
276278
parentEventID,
277279
string(metadataJson),
@@ -331,7 +333,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
331333
now := time.Now()
332334
row := tx.QueryRowContext(
333335
ctx,
334-
`SELECT i.id, i.instance_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i.sticky_until
336+
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i.sticky_until
335337
FROM instances i
336338
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
337339
WHERE
@@ -348,12 +350,12 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
348350
)
349351

350352
var id int
351-
var instanceID string
353+
var instanceID, executionID string
352354
var parentInstanceID *string
353355
var parentEventID *int64
354356
var metadataJson sql.NullString
355357
var stickyUntil *time.Time
356-
if err := row.Scan(&id, &instanceID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
358+
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
357359
if err == sql.ErrNoRows {
358360
return nil, nil
359361
}
@@ -383,9 +385,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
383385

384386
var wfi *workflow.Instance
385387
if parentInstanceID != nil {
386-
wfi = core.NewSubWorkflowInstance(instanceID, *parentInstanceID, *parentEventID)
388+
wfi = core.NewSubWorkflowInstance(instanceID, executionID, *parentInstanceID, *parentEventID)
387389
} else {
388-
wfi = core.NewWorkflowInstance(instanceID)
390+
wfi = core.NewWorkflowInstance(instanceID, executionID)
389391
}
390392

391393
var metadata *core.WorkflowMetadata
@@ -495,10 +497,11 @@ func (b *mysqlBackend) CompleteWorkflowTask(
495497

496498
res, err := tx.ExecContext(
497499
ctx,
498-
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND worker = ?`,
500+
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
499501
time.Now().Add(b.options.StickyTimeout),
500502
completedAt,
501503
instance.InstanceID,
504+
instance.ExecutionID,
502505
b.workerName,
503506
)
504507
if err != nil {
@@ -598,9 +601,10 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, in
598601
until := time.Now().Add(b.options.WorkflowLockTimeout)
599602
res, err := tx.ExecContext(
600603
ctx,
601-
`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND worker = ?`,
604+
`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
602605
until,
603606
instance.InstanceID,
607+
instance.ExecutionID,
604608
b.workerName,
605609
)
606610
if err != nil {
@@ -630,7 +634,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
630634
now := time.Now()
631635
res := tx.QueryRowContext(
632636
ctx,
633-
`SELECT activities.id, activity_id, activities.instance_id,
637+
`SELECT activities.id, activity_id, activities.instance_id, activities.execution_id,
634638
event_type, timestamp, schedule_event_id, attributes, visible_at
635639
FROM activities
636640
WHERE activities.locked_until IS NULL OR activities.locked_until < ?
@@ -640,12 +644,12 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
640644
)
641645

642646
var id int64
643-
var instanceID string
647+
var instanceID, executionID string
644648
var attributes []byte
645649
event := &history.Event{}
646650

647651
if err := res.Scan(
648-
&id, &event.ID, &instanceID, &event.Type,
652+
&id, &event.ID, &instanceID, &executionID, &event.Type,
649653
&event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil {
650654
if err == sql.ErrNoRows {
651655
return nil, nil
@@ -673,7 +677,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
673677

674678
t := &task.Activity{
675679
ID: event.ID,
676-
WorkflowInstance: core.NewWorkflowInstance(instanceID),
680+
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
677681
Event: event,
678682
}
679683

@@ -697,9 +701,10 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
697701
// Remove activity
698702
if res, err := tx.ExecContext(
699703
ctx,
700-
`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND worker = ?`,
704+
`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND execution_id = ? AND worker = ?`,
701705
id,
702706
instance.InstanceID,
707+
instance.ExecutionID,
703708
b.workerName,
704709
); err != nil {
705710
return fmt.Errorf("completing activity: %w", err)
@@ -763,9 +768,10 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowIn
763768
_, err = tx.ExecContext(
764769
ctx,
765770
`INSERT INTO activities
766-
(activity_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
771+
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
767772
event.ID,
768773
instance.InstanceID,
774+
instance.ExecutionID,
769775
event.Type,
770776
event.Timestamp,
771777
event.ScheduleEventID,

backend/mysql/schema.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
22
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
33
`instance_id` NVARCHAR(128) NOT NULL,
4+
`execution_id` NVARCHAR(128) NOT NULL,
45
`parent_instance_id` NVARCHAR(128) NULL,
56
`parent_schedule_event_id` BIGINT NULL,
67
`metadata` BLOB NULL,
@@ -52,6 +53,7 @@ CREATE TABLE IF NOT EXISTS `activities` (
5253
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
5354
`activity_id` NVARCHAR(64) NOT NULL,
5455
`instance_id` NVARCHAR(128) NOT NULL,
56+
`execution_id` NVARCHAR(128) NOT NULL,
5557
`event_type` INT NOT NULL,
5658
`timestamp` DATETIME NOT NULL,
5759
`schedule_event_id` BIGINT NOT NULL,
@@ -60,6 +62,6 @@ CREATE TABLE IF NOT EXISTS `activities` (
6062
`locked_until` DATETIME NULL,
6163
`worker` NVARCHAR(64) NULL,
6264

63-
UNIQUE INDEX `idx_activities_instance_id` (`instance_id`, `activity_id`, `worker`),
65+
UNIQUE INDEX `idx_activities_instance_id` (`instance_id`, `activity_id`, `execution_id`, `worker`),
6466
INDEX `idx_activities_locked_until` (`locked_until`)
6567
);

backend/sqlite/activities.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/cschleiden/go-workflows/internal/history"
88
)
99

10-
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID string, event *history.Event) error {
10+
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID string, event *history.Event) error {
1111
attributes, err := history.SerializeAttributes(event.Attributes)
1212
if err != nil {
1313
return err
@@ -16,9 +16,10 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID string, event
1616
_, err = tx.ExecContext(
1717
ctx,
1818
`INSERT INTO activities
19-
(id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
19+
(id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
2020
event.ID,
2121
instanceID,
22+
executionID,
2223
event.Type,
2324
event.Timestamp,
2425
event.ScheduleEventID,

backend/sqlite/diagnostics.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.id, i.created_at, i.completed_at
26+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
2727
FROM instances i
2828
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
@@ -35,7 +35,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
3535
} else {
3636
rows, err = tx.QueryContext(
3737
ctx,
38-
`SELECT i.id, i.created_at, i.completed_at
38+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
3939
FROM instances i
4040
ORDER BY i.created_at DESC, i.id DESC
4141
LIMIT ?`,
@@ -49,10 +49,10 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
4949
var instances []*diag.WorkflowInstanceRef
5050

5151
for rows.Next() {
52-
var id string
52+
var id, executionID string
5353
var createdAt time.Time
5454
var completedAt *time.Time
55-
err = rows.Scan(&id, &createdAt, &completedAt)
55+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
5656
if err != nil {
5757
return nil, err
5858
}
@@ -63,7 +63,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
6363
}
6464

6565
instances = append(instances, &diag.WorkflowInstanceRef{
66-
Instance: core.NewWorkflowInstance(id),
66+
Instance: core.NewWorkflowInstance(id, executionID),
6767
CreatedAt: createdAt,
6868
CompletedAt: completedAt,
6969
State: state,
@@ -80,13 +80,13 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID str
8080
}
8181
defer tx.Rollback()
8282

83-
res := tx.QueryRowContext(ctx, "SELECT id, created_at, completed_at FROM instances WHERE id = ?", instanceID)
83+
res := tx.QueryRowContext(ctx, "SELECT id, execution_id, created_at, completed_at FROM instances WHERE id = ?", instanceID)
8484

85-
var id string
85+
var id, executionID string
8686
var createdAt time.Time
8787
var completedAt *time.Time
8888

89-
err = res.Scan(&id, &createdAt, &completedAt)
89+
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
9090
if err != nil {
9191
if err == sql.ErrNoRows {
9292
return nil, nil
@@ -101,7 +101,7 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID str
101101
}
102102

103103
return &diag.WorkflowInstanceRef{
104-
Instance: core.NewWorkflowInstance(id),
104+
Instance: core.NewWorkflowInstance(id, executionID),
105105
CreatedAt: createdAt,
106106
CompletedAt: completedAt,
107107
State: state,

backend/sqlite/schema.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
22
`id` TEXT PRIMARY KEY,
3+
`execution_id` TEXT NO NULL,
34
`parent_instance_id` TEXT NULL,
45
`parent_schedule_event_id` INTEGER NULL,
56
`metadata` TEXT NULL,
@@ -44,6 +45,7 @@ CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`ins
4445
CREATE TABLE IF NOT EXISTS `activities` (
4546
`id` TEXT PRIMARY KEY,
4647
`instance_id` TEXT NOT NULL,
48+
`execution_id` TEXT NOT NULL,
4749
`event_type` INTEGER NOT NULL,
4850
`timestamp` DATETIME NOT NULL,
4951
`schedule_event_id` INT NOT NULL,

0 commit comments

Comments
 (0)