Skip to content

Commit d29eb06

Browse files
committed
Remove executionID
1 parent 3468994 commit d29eb06

32 files changed

+117
-146
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,4 +702,4 @@ This kind of check is understandable for simple changes, but it becomes hard and
702702
703703
### `ContinueAsNew`
704704
705-
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).
705+
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. This feature is not implemented in go-workflows.

activitytester/activitytester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ func WithActivityTestState(ctx context.Context, activityID, instanceID string, l
1414
logger = dlogger.NewDefaultLogger()
1515
}
1616

17-
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID, ""), logger))
17+
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID), logger))
1818
}

backend/mysql/diagnostics.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
26+
`SELECT i.instance_id, i.created_at, i.completed_at
2727
FROM instances i
2828
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
@@ -35,7 +35,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
3535
} else {
3636
rows, err = tx.QueryContext(
3737
ctx,
38-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
38+
`SELECT i.instance_id, i.created_at, i.completed_at
3939
FROM instances i
4040
ORDER BY i.created_at DESC, i.instance_id DESC
4141
LIMIT ?`,
@@ -49,10 +49,10 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
4949
var instances []*diag.WorkflowInstanceRef
5050

5151
for rows.Next() {
52-
var id, executionID string
52+
var id string
5353
var createdAt time.Time
5454
var completedAt *time.Time
55-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
55+
err = rows.Scan(&id, &createdAt, &completedAt)
5656
if err != nil {
5757
return nil, err
5858
}
@@ -63,7 +63,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
6363
}
6464

6565
instances = append(instances, &diag.WorkflowInstanceRef{
66-
Instance: core.NewWorkflowInstance(id, executionID),
66+
Instance: core.NewWorkflowInstance(id),
6767
CreatedAt: createdAt,
6868
CompletedAt: completedAt,
6969
State: state,
@@ -80,13 +80,13 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
8080
}
8181
defer tx.Rollback()
8282

83-
res := tx.QueryRowContext(ctx, "SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
83+
res := tx.QueryRowContext(ctx, "SELECT instance_id, created_at, completed_at FROM instances WHERE instance_id = ?", instanceID)
8484

85-
var id, executionID string
85+
var id string
8686
var createdAt time.Time
8787
var completedAt *time.Time
8888

89-
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
89+
err = res.Scan(&id, &createdAt, &completedAt)
9090
if err != nil {
9191
if err == sql.ErrNoRows {
9292
return nil, nil
@@ -101,7 +101,7 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instanceID stri
101101
}
102102

103103
return &diag.WorkflowInstanceRef{
104-
Instance: core.NewWorkflowInstance(id, executionID),
104+
Instance: core.NewWorkflowInstance(id),
105105
CreatedAt: createdAt,
106106
CompletedAt: completedAt,
107107
State: state,

backend/mysql/mysql.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,8 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
197197
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error) {
198198
row := b.db.QueryRowContext(
199199
ctx,
200-
"SELECT completed_at FROM instances WHERE instance_id = ? AND execution_id = ?",
200+
"SELECT completed_at FROM instances WHERE instance_id = ?",
201201
instance.InstanceID,
202-
instance.ExecutionID,
203202
)
204203

205204
var completedAt sql.NullTime
@@ -234,9 +233,8 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
234233

235234
res, err := tx.ExecContext(
236235
ctx,
237-
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?, ?)",
236+
"INSERT IGNORE INTO `instances` (instance_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?)",
238237
wfi.InstanceID,
239-
wfi.ExecutionID,
240238
parentInstanceID,
241239
parentEventID,
242240
string(metadataJson),
@@ -296,7 +294,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
296294
now := time.Now()
297295
row := tx.QueryRowContext(
298296
ctx,
299-
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i.sticky_until
297+
`SELECT i.id, i.instance_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i.sticky_until
300298
FROM instances i
301299
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
302300
WHERE
@@ -313,12 +311,12 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
313311
)
314312

315313
var id int
316-
var instanceID, executionID string
314+
var instanceID string
317315
var parentInstanceID *string
318316
var parentEventID *int64
319317
var metadataJson sql.NullString
320318
var stickyUntil *time.Time
321-
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
319+
if err := row.Scan(&id, &instanceID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
322320
if err == sql.ErrNoRows {
323321
return nil, nil
324322
}
@@ -348,9 +346,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
348346

349347
var wfi *workflow.Instance
350348
if parentInstanceID != nil {
351-
wfi = core.NewSubWorkflowInstance(instanceID, executionID, *parentInstanceID, *parentEventID)
349+
wfi = core.NewSubWorkflowInstance(instanceID, *parentInstanceID, *parentEventID)
352350
} else {
353-
wfi = core.NewWorkflowInstance(instanceID, executionID)
351+
wfi = core.NewWorkflowInstance(instanceID)
354352
}
355353

356354
var metadata *core.WorkflowMetadata
@@ -460,11 +458,10 @@ func (b *mysqlBackend) CompleteWorkflowTask(
460458

461459
res, err := tx.ExecContext(
462460
ctx,
463-
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
461+
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND worker = ?`,
464462
time.Now().Add(b.options.StickyTimeout),
465463
completedAt,
466464
instance.InstanceID,
467-
instance.ExecutionID,
468465
b.workerName,
469466
)
470467
if err != nil {
@@ -564,10 +561,9 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, in
564561
until := time.Now().Add(b.options.WorkflowLockTimeout)
565562
res, err := tx.ExecContext(
566563
ctx,
567-
`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
564+
`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND worker = ?`,
568565
until,
569566
instance.InstanceID,
570-
instance.ExecutionID,
571567
b.workerName,
572568
)
573569
if err != nil {
@@ -597,7 +593,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
597593
now := time.Now()
598594
res := tx.QueryRowContext(
599595
ctx,
600-
`SELECT activities.id, activity_id, activities.instance_id, activities.execution_id,
596+
`SELECT activities.id, activity_id, activities.instance_id,
601597
instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
602598
FROM activities
603599
INNER JOIN instances ON activities.instance_id = instances.instance_id
@@ -608,13 +604,13 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
608604
)
609605

610606
var id int64
611-
var instanceID, executionID string
607+
var instanceID string
612608
var attributes []byte
613609
var metadataJson sql.NullString
614610
event := &history.Event{}
615611

616612
if err := res.Scan(
617-
&id, &event.ID, &instanceID, &executionID, &metadataJson, &event.Type,
613+
&id, &event.ID, &instanceID, &metadataJson, &event.Type,
618614
&event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil {
619615
if err == sql.ErrNoRows {
620616
return nil, nil
@@ -647,7 +643,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
647643

648644
t := &task.Activity{
649645
ID: event.ID,
650-
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
646+
WorkflowInstance: core.NewWorkflowInstance(instanceID),
651647
Metadata: metadata,
652648
Event: event,
653649
}
@@ -672,10 +668,9 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
672668
// Remove activity
673669
if res, err := tx.ExecContext(
674670
ctx,
675-
`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND execution_id = ? AND worker = ?`,
671+
`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND worker = ?`,
676672
id,
677673
instance.InstanceID,
678-
instance.ExecutionID,
679674
b.workerName,
680675
); err != nil {
681676
return fmt.Errorf("completing activity: %w", err)
@@ -739,10 +734,9 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowIn
739734
_, err = tx.ExecContext(
740735
ctx,
741736
`INSERT INTO activities
742-
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
737+
(activity_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
743738
event.ID,
744739
instance.InstanceID,
745-
instance.ExecutionID,
746740
event.Type,
747741
event.Timestamp,
748742
event.ScheduleEventID,

backend/mysql/schema.sql

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
22
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
33
`instance_id` NVARCHAR(128) NOT NULL,
4-
`execution_id` NVARCHAR(128) NOT NULL,
54
`parent_instance_id` NVARCHAR(128) NULL,
65
`parent_schedule_event_id` BIGINT NULL,
76
`metadata` BLOB NULL,
@@ -53,7 +52,6 @@ CREATE TABLE IF NOT EXISTS `activities` (
5352
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
5453
`activity_id` NVARCHAR(64) NOT NULL,
5554
`instance_id` NVARCHAR(128) NOT NULL,
56-
`execution_id` NVARCHAR(128) NOT NULL,
5755
`event_type` INT NOT NULL,
5856
`timestamp` DATETIME NOT NULL,
5957
`schedule_event_id` BIGINT NOT NULL,
@@ -62,6 +60,6 @@ CREATE TABLE IF NOT EXISTS `activities` (
6260
`locked_until` DATETIME NULL,
6361
`worker` NVARCHAR(64) NULL,
6462

65-
UNIQUE INDEX `idx_activities_instance_id` (`instance_id`, `activity_id`, `execution_id`, `worker`),
63+
UNIQUE INDEX `idx_activities_instance_id` (`instance_id`, `activity_id`, `worker`),
6664
INDEX `idx_activities_locked_until` (`locked_until`)
6765
);

backend/sqlite/activities.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/cschleiden/go-workflows/internal/history"
88
)
99

10-
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID string, event *history.Event) error {
10+
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID string, event *history.Event) error {
1111
attributes, err := history.SerializeAttributes(event.Attributes)
1212
if err != nil {
1313
return err
@@ -16,10 +16,9 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID s
1616
_, err = tx.ExecContext(
1717
ctx,
1818
`INSERT INTO activities
19-
(id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
19+
(id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
2020
event.ID,
2121
instanceID,
22-
executionID,
2322
event.Type,
2423
event.Timestamp,
2524
event.ScheduleEventID,

backend/sqlite/diagnostics.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
26+
`SELECT i.id, i.created_at, i.completed_at
2727
FROM instances i
2828
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
@@ -35,7 +35,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
3535
} else {
3636
rows, err = tx.QueryContext(
3737
ctx,
38-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
38+
`SELECT i.id, i.created_at, i.completed_at
3939
FROM instances i
4040
ORDER BY i.created_at DESC, i.id DESC
4141
LIMIT ?`,
@@ -49,10 +49,10 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
4949
var instances []*diag.WorkflowInstanceRef
5050

5151
for rows.Next() {
52-
var id, executionID string
52+
var id string
5353
var createdAt time.Time
5454
var completedAt *time.Time
55-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
55+
err = rows.Scan(&id, &createdAt, &completedAt)
5656
if err != nil {
5757
return nil, err
5858
}
@@ -63,7 +63,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
6363
}
6464

6565
instances = append(instances, &diag.WorkflowInstanceRef{
66-
Instance: core.NewWorkflowInstance(id, executionID),
66+
Instance: core.NewWorkflowInstance(id),
6767
CreatedAt: createdAt,
6868
CompletedAt: completedAt,
6969
State: state,
@@ -80,13 +80,13 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID str
8080
}
8181
defer tx.Rollback()
8282

83-
res := tx.QueryRowContext(ctx, "SELECT id, execution_id, created_at, completed_at FROM instances WHERE id = ?", instanceID)
83+
res := tx.QueryRowContext(ctx, "SELECT id, created_at, completed_at FROM instances WHERE id = ?", instanceID)
8484

85-
var id, executionID string
85+
var id string
8686
var createdAt time.Time
8787
var completedAt *time.Time
8888

89-
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
89+
err = res.Scan(&id, &createdAt, &completedAt)
9090
if err != nil {
9191
if err == sql.ErrNoRows {
9292
return nil, nil
@@ -101,7 +101,7 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID str
101101
}
102102

103103
return &diag.WorkflowInstanceRef{
104-
Instance: core.NewWorkflowInstance(id, executionID),
104+
Instance: core.NewWorkflowInstance(id),
105105
CreatedAt: createdAt,
106106
CompletedAt: completedAt,
107107
State: state,

backend/sqlite/schema.sql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
22
`id` TEXT PRIMARY KEY,
3-
`execution_id` TEXT NO NULL,
43
`parent_instance_id` TEXT NULL,
54
`parent_schedule_event_id` INTEGER NULL,
65
`metadata` TEXT NULL,
@@ -45,7 +44,6 @@ CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`ins
4544
CREATE TABLE IF NOT EXISTS `activities` (
4645
`id` TEXT PRIMARY KEY,
4746
`instance_id` TEXT NOT NULL,
48-
`execution_id` TEXT NOT NULL,
4947
`event_type` INTEGER NOT NULL,
5048
`timestamp` DATETIME NOT NULL,
5149
`schedule_event_id` INT NOT NULL,

0 commit comments

Comments
 (0)