Skip to content

Commit 9a07ce8

Browse files
authored
Add executor generated sequence id
1 parent 5e6d1db commit 9a07ce8

File tree

20 files changed

+176
-124
lines changed

20 files changed

+176
-124
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Backend interface {
5555
// GetActivityTask returns a pending activity task or nil if there are no pending activities
5656
GetActivityTask(ctx context.Context) (*task.Activity, error)
5757

58-
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
58+
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
5959
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event history.Event) error
6060

6161
// ExtendActivityTask extends the lock of an activity task

backend/mysql/events.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
2525
}
2626
batchEvents := events[batchStart:batchEnd]
2727

28-
query := "INSERT INTO `" + tableName + "` (event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)" +
29-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
28+
query := "INSERT INTO `" + tableName + "` (event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
29+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
3030

3131
args := make([]interface{}, 0, len(batchEvents)*7)
3232

@@ -36,7 +36,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
3636
return err
3737
}
3838

39-
args = append(args, newEvent.ID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
39+
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
4040
}
4141

4242
_, err := tx.ExecContext(

backend/mysql/mysql.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
136136

137137
historyEvents, err := tx.QueryContext(
138138
ctx,
139-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
139+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
140140
instance.InstanceID,
141141
)
142142
if err != nil {
@@ -153,6 +153,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
153153

154154
if err := historyEvents.Scan(
155155
&historyEvent.ID,
156+
&historyEvent.SequenceID,
156157
&instanceID,
157158
&historyEvent.Type,
158159
&historyEvent.Timestamp,
@@ -347,7 +348,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
347348
// Get new events
348349
events, err := tx.QueryContext(
349350
ctx,
350-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE instance_id = ? AND (`visible_at` IS NULL OR `visible_at` <= ?) ORDER BY id",
351+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE instance_id = ? AND (`visible_at` IS NULL OR `visible_at` <= ?) ORDER BY id",
351352
instanceID,
352353
now,
353354
)
@@ -361,7 +362,16 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
361362

362363
historyEvent := history.Event{}
363364

364-
if err := events.Scan(&historyEvent.ID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
365+
if err := events.Scan(
366+
&historyEvent.ID,
367+
&historyEvent.SequenceID,
368+
&instanceID,
369+
&historyEvent.Type,
370+
&historyEvent.Timestamp,
371+
&historyEvent.ScheduleEventID,
372+
&attributes,
373+
&historyEvent.VisibleAt,
374+
); err != nil {
365375
return nil, errors.Wrap(err, "could not scan event")
366376
}
367377

@@ -384,7 +394,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
384394
if kind != task.Continuation {
385395
historyEvents, err := tx.QueryContext(
386396
ctx,
387-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
397+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
388398
instanceID,
389399
)
390400
if err != nil {
@@ -399,6 +409,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
399409

400410
if err := historyEvents.Scan(
401411
&historyEvent.ID,
412+
&historyEvent.SequenceID,
402413
&instanceID,
403414
&historyEvent.Type,
404415
&historyEvent.Timestamp,
@@ -420,7 +431,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
420431
}
421432
} else {
422433
// Get only most recent history event
423-
row := tx.QueryRowContext(ctx, "SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
434+
row := tx.QueryRowContext(ctx, "SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
424435

425436
var instanceID string
426437
var attributes []byte
@@ -429,6 +440,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
429440

430441
if err := row.Scan(
431442
&lastHistoryEvent.ID,
443+
&lastHistoryEvent.SequenceID,
432444
&instanceID,
433445
&lastHistoryEvent.Type,
434446
&lastHistoryEvent.Timestamp,
@@ -529,7 +541,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
529541

530542
// Schedule activities
531543
for _, e := range activityEvents {
532-
if err := scheduleActivity(ctx, tx, instance.InstanceID, instance.ExecutionID, e); err != nil {
544+
if err := scheduleActivity(ctx, tx, instance, e); err != nil {
533545
return errors.Wrap(err, "could not schedule activity")
534546
}
535547
}
@@ -615,7 +627,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
615627
now,
616628
)
617629

618-
var id int
630+
var id int64
619631
var instanceID, executionID string
620632
var attributes []byte
621633
event := history.Event{}
@@ -729,7 +741,7 @@ func (b *mysqlBackend) ExtendActivityTask(ctx context.Context, activityID string
729741
return tx.Commit()
730742
}
731743

732-
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID string, event history.Event) error {
744+
func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event history.Event) error {
733745
a, err := history.SerializeAttributes(event.Attributes)
734746
if err != nil {
735747
return err
@@ -740,8 +752,8 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID s
740752
`INSERT INTO activities
741753
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
742754
event.ID,
743-
instanceID,
744-
executionID,
755+
instance.InstanceID,
756+
instance.ExecutionID,
745757
event.Type,
746758
event.Timestamp,
747759
event.ScheduleEventID,

backend/mysql/schema.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
2-
`id` BIGBIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
2+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
33
`instance_id` NVARCHAR(128) NOT NULL,
44
`execution_id` NVARCHAR(128) NOT NULL,
55
`parent_instance_id` NVARCHAR(128) NULL,
6-
`parent_schedule_event_id` BIGBIGINT NULL,
6+
`parent_schedule_event_id` BIGINT NULL,
77
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
88
`completed_at` DATETIME NULL,
99
`locked_until` DATETIME NULL,
@@ -19,6 +19,7 @@ CREATE TABLE IF NOT EXISTS `instances` (
1919
CREATE TABLE IF NOT EXISTS `pending_events` (
2020
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
2121
`event_id` NVARCHAR(128) NOT NULL,
22+
`sequence_id` BIGINT NOT NULL, -- Not used, but keep for now for query compat
2223
`instance_id` NVARCHAR(128) NOT NULL,
2324
`event_type` INT NOT NULL,
2425
`timestamp` DATETIME NOT NULL,
@@ -34,6 +35,7 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
3435
CREATE TABLE IF NOT EXISTS `history` (
3536
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
3637
`event_id` NVARCHAR(64) NOT NULL,
38+
`sequence_id` BIGINT NOT NULL,
3739
`instance_id` NVARCHAR(128) NOT NULL,
3840
`event_type` INT NOT NULL,
3941
`timestamp` DATETIME NOT NULL,

backend/redis/redis.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/cschleiden/go-workflows/backend"
@@ -42,9 +43,9 @@ func NewRedisBackend(address, username, password string, db int, opts ...RedisBa
4243
})
4344

4445
// // TODO: Only for dev
45-
// if err := client.FlushDB(context.Background()).Err(); err != nil {
46-
// panic(err)
47-
// }
46+
if err := client.FlushDB(context.Background()).Err(); err != nil {
47+
panic(err)
48+
}
4849

4950
workflowQueue, err := taskqueue.New[workflowTaskData](client, "workflows")
5051
if err != nil {

backend/redis/taskqueue/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ func (q *taskQueue[T]) Extend(ctx context.Context, taskID string) error {
152152
// We have to XACK _and_ XDEL here. See https://github.com/redis/redis/issues/5754
153153
var completeCmd = redis.NewScript(`
154154
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
155+
if task == nil then
156+
return nil
157+
end
155158
local id = task[1][2][2]
156159
redis.call("SREM", KEYS[1], id)
157160
redis.call("XACK", KEYS[2], ARGV[2], ARGV[1])

backend/redis/taskqueue/queue_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func Test_TaskQueue(t *testing.T) {
2323
})
2424

2525
lockTimeout := time.Millisecond * 10
26+
blockTimeout := time.Millisecond * 10
2627

2728
tests := []struct {
2829
name string
@@ -45,7 +46,7 @@ func Test_TaskQueue(t *testing.T) {
4546
_, err = q.Enqueue(context.Background(), "t1", nil)
4647
require.NoError(t, err)
4748

48-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
49+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
4950
require.NoError(t, err)
5051
require.NotNil(t, task)
5152
require.Equal(t, "t1", task.ID)
@@ -63,7 +64,7 @@ func Test_TaskQueue(t *testing.T) {
6364
_, err = q.Enqueue(context.Background(), "t1", nil)
6465
require.Error(t, ErrTaskAlreadyInQueue, err)
6566

66-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
67+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
6768
require.NoError(t, err)
6869
require.NotNil(t, task)
6970

@@ -91,7 +92,7 @@ func Test_TaskQueue(t *testing.T) {
9192
})
9293
require.NoError(t, err)
9394

94-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
95+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
9596
require.NoError(t, err)
9697
require.NotNil(t, task)
9798
require.Equal(t, "t1", task.ID)
@@ -111,7 +112,7 @@ func Test_TaskQueue(t *testing.T) {
111112
require.NoError(t, err)
112113

113114
// Dequeue using second worker
114-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
115+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
115116
require.NoError(t, err)
116117
require.NotNil(t, task)
117118
require.Equal(t, "t1", task.ID)
@@ -126,7 +127,7 @@ func Test_TaskQueue(t *testing.T) {
126127
_, err := q.Enqueue(context.Background(), "t1", nil)
127128
require.NoError(t, err)
128129

129-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
130+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
130131
require.NoError(t, err)
131132
require.NotNil(t, task)
132133

@@ -137,7 +138,7 @@ func Test_TaskQueue(t *testing.T) {
137138
time.Sleep(time.Millisecond * 10)
138139

139140
// Try to recover using second worker
140-
task2, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
141+
task2, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
141142
require.NoError(t, err)
142143
require.Nil(t, task2)
143144
},
@@ -153,15 +154,15 @@ func Test_TaskQueue(t *testing.T) {
153154
q2, _ := New[any](client, "test")
154155
require.NoError(t, err)
155156

156-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
157+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
157158
require.NoError(t, err)
158159
require.NotNil(t, task)
159160
require.Equal(t, "t1", task.ID)
160161

161162
time.Sleep(time.Millisecond * 10)
162163

163164
// Assume q2 crashed, recover from other worker
164-
recoveredTask, err := q.Dequeue(context.Background(), time.Millisecond*1, time.Millisecond*10)
165+
recoveredTask, err := q.Dequeue(context.Background(), time.Millisecond*1, blockTimeout)
165166
require.NoError(t, err)
166167
require.NotNil(t, task)
167168
require.Equal(t, task, recoveredTask)
@@ -178,17 +179,17 @@ func Test_TaskQueue(t *testing.T) {
178179
q2, _ := New[any](client, "test")
179180
require.NoError(t, err)
180181

181-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
182+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
182183
require.NoError(t, err)
183184
require.NotNil(t, task)
184185
require.Equal(t, "t1", task.ID)
185186

186-
time.Sleep(time.Millisecond * 10)
187+
time.Sleep(time.Millisecond * 5)
187188

188189
err = q2.Extend(context.Background(), task.TaskID)
189190
require.NoError(t, err)
190191

191-
recoveredTask, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
192+
recoveredTask, err := q.Dequeue(context.Background(), lockTimeout*2, blockTimeout)
192193
require.NoError(t, err)
193194
require.Nil(t, recoveredTask)
194195
},

backend/sqlite/events.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func scanEvent(row Scanner) (history.Event, error) {
6161

6262
historyEvent := history.Event{}
6363

64-
if err := row.Scan(&historyEvent.ID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
64+
if err := row.Scan(&historyEvent.ID, &historyEvent.SequenceID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
6565
return historyEvent, errors.Wrap(err, "could not scan event")
6666
}
6767

@@ -92,8 +92,8 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
9292
}
9393
batchEvents := events[batchStart:batchEnd]
9494

95-
query := "INSERT INTO `" + tableName + "` (id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)" +
96-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
95+
query := "INSERT INTO `" + tableName + "` (id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
96+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
9797

9898
args := make([]interface{}, 0, len(batchEvents)*7)
9999

@@ -103,7 +103,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
103103
return err
104104
}
105105

106-
args = append(args, newEvent.ID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
106+
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
107107
}
108108

109109
_, err := tx.ExecContext(

backend/sqlite/schema.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ CREATE INDEX IF NOT EXISTS `idx_instances_parent_instance_id` ON `instances` (`p
1515

1616
CREATE TABLE IF NOT EXISTS `pending_events` (
1717
`id` TEXT PRIMARY KEY,
18+
`sequence_id` INTEGER NOT NULL, -- not used but keep for now for query compat
1819
`instance_id` TEXT NOT NULL,
1920
`event_type` INTEGER NOT NULL,
2021
`timestamp` DATETIME NOT NULL,
@@ -23,8 +24,11 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
2324
`visible_at` DATETIME NULL
2425
);
2526

27+
CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_visible_at` ON `pending_events` (`instance_id`, `visible_at`);
28+
2629
CREATE TABLE IF NOT EXISTS `history` (
2730
`id` TEXT PRIMARY KEY,
31+
`sequence_id` INTEGER NOT NULL,
2832
`instance_id` TEXT NOT NULL,
2933
`event_type` INTEGER NOT NULL,
3034
`timestamp` DATETIME NOT NULL,
@@ -33,6 +37,8 @@ CREATE TABLE IF NOT EXISTS `history` (
3337
`visible_at` DATETIME NULL
3438
);
3539

40+
CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`instance_id`, `sequence_id`);
41+
3642
CREATE TABLE IF NOT EXISTS `activities` (
3743
`id` TEXT PRIMARY KEY,
3844
`instance_id` TEXT NOT NULL,

0 commit comments

Comments
 (0)