Skip to content

Commit 6f944d1

Browse files
authored
Merge pull request #40 from cschleiden/sequence-id
Add executor generated sequence ids
2 parents 72abcbe + 823b2af commit 6f944d1

File tree

24 files changed

+208
-149
lines changed

24 files changed

+208
-149
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Backend interface {
5555
// GetActivityTask returns a pending activity task or nil if there are no pending activities
5656
GetActivityTask(ctx context.Context) (*task.Activity, error)
5757

58-
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
58+
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
5959
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event history.Event) error
6060

6161
// ExtendActivityTask extends the lock of an activity task

backend/mysql/events.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
2525
}
2626
batchEvents := events[batchStart:batchEnd]
2727

28-
query := "INSERT INTO `" + tableName + "` (event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)" +
29-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
28+
query := "INSERT INTO `" + tableName + "` (event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
29+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
3030

3131
args := make([]interface{}, 0, len(batchEvents)*7)
3232

@@ -36,7 +36,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
3636
return err
3737
}
3838

39-
args = append(args, newEvent.ID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
39+
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
4040
}
4141

4242
_, err := tx.ExecContext(

backend/mysql/mysql.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
136136

137137
historyEvents, err := tx.QueryContext(
138138
ctx,
139-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
139+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
140140
instance.InstanceID,
141141
)
142142
if err != nil {
@@ -153,6 +153,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
153153

154154
if err := historyEvents.Scan(
155155
&historyEvent.ID,
156+
&historyEvent.SequenceID,
156157
&instanceID,
157158
&historyEvent.Type,
158159
&historyEvent.Timestamp,
@@ -200,7 +201,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
200201

201202
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
202203
var parentInstanceID *string
203-
var parentEventID *int
204+
var parentEventID *int64
204205
if wfi.SubWorkflow() {
205206
i := wfi.ParentInstanceID
206207
parentInstanceID = &i
@@ -291,7 +292,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
291292
var id int
292293
var instanceID, executionID string
293294
var parentInstanceID *string
294-
var parentEventID *int
295+
var parentEventID *int64
295296
var stickyUntil *time.Time
296297
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &stickyUntil); err != nil {
297298
if err == sql.ErrNoRows {
@@ -347,7 +348,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
347348
// Get new events
348349
events, err := tx.QueryContext(
349350
ctx,
350-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE instance_id = ? AND (`visible_at` IS NULL OR `visible_at` <= ?) ORDER BY id",
351+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE instance_id = ? AND (`visible_at` IS NULL OR `visible_at` <= ?) ORDER BY id",
351352
instanceID,
352353
now,
353354
)
@@ -361,7 +362,16 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
361362

362363
historyEvent := history.Event{}
363364

364-
if err := events.Scan(&historyEvent.ID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
365+
if err := events.Scan(
366+
&historyEvent.ID,
367+
&historyEvent.SequenceID,
368+
&instanceID,
369+
&historyEvent.Type,
370+
&historyEvent.Timestamp,
371+
&historyEvent.ScheduleEventID,
372+
&attributes,
373+
&historyEvent.VisibleAt,
374+
); err != nil {
365375
return nil, errors.Wrap(err, "could not scan event")
366376
}
367377

@@ -384,7 +394,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
384394
if kind != task.Continuation {
385395
historyEvents, err := tx.QueryContext(
386396
ctx,
387-
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
397+
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
388398
instanceID,
389399
)
390400
if err != nil {
@@ -399,6 +409,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
399409

400410
if err := historyEvents.Scan(
401411
&historyEvent.ID,
412+
&historyEvent.SequenceID,
402413
&instanceID,
403414
&historyEvent.Type,
404415
&historyEvent.Timestamp,
@@ -420,7 +431,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
420431
}
421432
} else {
422433
// Get only most recent history event
423-
row := tx.QueryRowContext(ctx, "SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
434+
row := tx.QueryRowContext(ctx, "SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id DESC LIMIT 1", instanceID)
424435

425436
var instanceID string
426437
var attributes []byte
@@ -429,6 +440,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
429440

430441
if err := row.Scan(
431442
&lastHistoryEvent.ID,
443+
&lastHistoryEvent.SequenceID,
432444
&instanceID,
433445
&lastHistoryEvent.Type,
434446
&lastHistoryEvent.Timestamp,
@@ -529,7 +541,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
529541

530542
// Schedule activities
531543
for _, e := range activityEvents {
532-
if err := scheduleActivity(ctx, tx, instance.InstanceID, instance.ExecutionID, e); err != nil {
544+
if err := scheduleActivity(ctx, tx, instance, e); err != nil {
533545
return errors.Wrap(err, "could not schedule activity")
534546
}
535547
}
@@ -615,7 +627,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
615627
now,
616628
)
617629

618-
var id int
630+
var id int64
619631
var instanceID, executionID string
620632
var attributes []byte
621633
event := history.Event{}
@@ -729,7 +741,7 @@ func (b *mysqlBackend) ExtendActivityTask(ctx context.Context, activityID string
729741
return tx.Commit()
730742
}
731743

732-
func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID string, event history.Event) error {
744+
func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event history.Event) error {
733745
a, err := history.SerializeAttributes(event.Attributes)
734746
if err != nil {
735747
return err
@@ -740,8 +752,8 @@ func scheduleActivity(ctx context.Context, tx *sql.Tx, instanceID, executionID s
740752
`INSERT INTO activities
741753
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
742754
event.ID,
743-
instanceID,
744-
executionID,
755+
instance.InstanceID,
756+
instance.ExecutionID,
745757
event.Type,
746758
event.Timestamp,
747759
event.ScheduleEventID,

backend/mysql/schema.sql

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
CREATE TABLE IF NOT EXISTS `instances` (
2-
`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
2+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
33
`instance_id` NVARCHAR(128) NOT NULL,
44
`execution_id` NVARCHAR(128) NOT NULL,
55
`parent_instance_id` NVARCHAR(128) NULL,
6-
`parent_schedule_event_id` INT NULL,
6+
`parent_schedule_event_id` BIGINT NULL,
77
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
88
`completed_at` DATETIME NULL,
99
`locked_until` DATETIME NULL,
@@ -17,12 +17,13 @@ CREATE TABLE IF NOT EXISTS `instances` (
1717

1818

1919
CREATE TABLE IF NOT EXISTS `pending_events` (
20-
`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
20+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
2121
`event_id` NVARCHAR(128) NOT NULL,
22+
`sequence_id` BIGINT NOT NULL, -- Not used, but keep for now for query compat
2223
`instance_id` NVARCHAR(128) NOT NULL,
2324
`event_type` INT NOT NULL,
2425
`timestamp` DATETIME NOT NULL,
25-
`schedule_event_id` INT NOT NULL,
26+
`schedule_event_id` BIGINT NOT NULL,
2627
`attributes` BLOB NOT NULL,
2728
`visible_at` DATETIME NULL,
2829

@@ -32,12 +33,13 @@ CREATE TABLE IF NOT EXISTS `pending_events` (
3233

3334

3435
CREATE TABLE IF NOT EXISTS `history` (
35-
`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
36+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
3637
`event_id` NVARCHAR(64) NOT NULL,
38+
`sequence_id` BIGINT NOT NULL,
3739
`instance_id` NVARCHAR(128) NOT NULL,
3840
`event_type` INT NOT NULL,
3941
`timestamp` DATETIME NOT NULL,
40-
`schedule_event_id` INT NOT NULL,
42+
`schedule_event_id` BIGINT NOT NULL,
4143
`attributes` BLOB NOT NULL,
4244
`visible_at` DATETIME NULL, -- Is this required?
4345

@@ -46,13 +48,13 @@ CREATE TABLE IF NOT EXISTS `history` (
4648

4749

4850
CREATE TABLE IF NOT EXISTS `activities` (
49-
`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
51+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
5052
`activity_id` NVARCHAR(64) NOT NULL,
5153
`instance_id` NVARCHAR(128) NOT NULL,
5254
`execution_id` NVARCHAR(128) NOT NULL,
5355
`event_type` INT NOT NULL,
5456
`timestamp` DATETIME NOT NULL,
55-
`schedule_event_id` INT NOT NULL,
57+
`schedule_event_id` BIGINT NOT NULL,
5658
`attributes` BLOB NOT NULL,
5759
`visible_at` DATETIME NULL,
5860
`locked_until` DATETIME NULL,

backend/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewRedisBackend(address, username, password string, db int, opts ...RedisBa
4141
DB: db,
4242
})
4343

44-
// // TODO: Only for dev
44+
// // // TODO: Only for dev
4545
// if err := client.FlushDB(context.Background()).Err(); err != nil {
4646
// panic(err)
4747
// }

backend/redis/taskqueue/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ func (q *taskQueue[T]) Extend(ctx context.Context, taskID string) error {
152152
// We have to XACK _and_ XDEL here. See https://github.com/redis/redis/issues/5754
153153
var completeCmd = redis.NewScript(`
154154
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
155+
if task == nil then
156+
return nil
157+
end
155158
local id = task[1][2][2]
156159
redis.call("SREM", KEYS[1], id)
157160
redis.call("XACK", KEYS[2], ARGV[2], ARGV[1])

backend/redis/taskqueue/queue_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func Test_TaskQueue(t *testing.T) {
2323
})
2424

2525
lockTimeout := time.Millisecond * 10
26+
blockTimeout := time.Millisecond * 10
2627

2728
tests := []struct {
2829
name string
@@ -45,7 +46,7 @@ func Test_TaskQueue(t *testing.T) {
4546
_, err = q.Enqueue(context.Background(), "t1", nil)
4647
require.NoError(t, err)
4748

48-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
49+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
4950
require.NoError(t, err)
5051
require.NotNil(t, task)
5152
require.Equal(t, "t1", task.ID)
@@ -63,7 +64,7 @@ func Test_TaskQueue(t *testing.T) {
6364
_, err = q.Enqueue(context.Background(), "t1", nil)
6465
require.Error(t, ErrTaskAlreadyInQueue, err)
6566

66-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
67+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
6768
require.NoError(t, err)
6869
require.NotNil(t, task)
6970

@@ -91,7 +92,7 @@ func Test_TaskQueue(t *testing.T) {
9192
})
9293
require.NoError(t, err)
9394

94-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
95+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
9596
require.NoError(t, err)
9697
require.NotNil(t, task)
9798
require.Equal(t, "t1", task.ID)
@@ -111,7 +112,7 @@ func Test_TaskQueue(t *testing.T) {
111112
require.NoError(t, err)
112113

113114
// Dequeue using second worker
114-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
115+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
115116
require.NoError(t, err)
116117
require.NotNil(t, task)
117118
require.Equal(t, "t1", task.ID)
@@ -126,7 +127,7 @@ func Test_TaskQueue(t *testing.T) {
126127
_, err := q.Enqueue(context.Background(), "t1", nil)
127128
require.NoError(t, err)
128129

129-
task, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
130+
task, err := q.Dequeue(context.Background(), lockTimeout, blockTimeout)
130131
require.NoError(t, err)
131132
require.NotNil(t, task)
132133

@@ -137,7 +138,7 @@ func Test_TaskQueue(t *testing.T) {
137138
time.Sleep(time.Millisecond * 10)
138139

139140
// Try to recover using second worker
140-
task2, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
141+
task2, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
141142
require.NoError(t, err)
142143
require.Nil(t, task2)
143144
},
@@ -153,15 +154,15 @@ func Test_TaskQueue(t *testing.T) {
153154
q2, _ := New[any](client, "test")
154155
require.NoError(t, err)
155156

156-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
157+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
157158
require.NoError(t, err)
158159
require.NotNil(t, task)
159160
require.Equal(t, "t1", task.ID)
160161

161162
time.Sleep(time.Millisecond * 10)
162163

163164
// Assume q2 crashed, recover from other worker
164-
recoveredTask, err := q.Dequeue(context.Background(), time.Millisecond*1, time.Millisecond*10)
165+
recoveredTask, err := q.Dequeue(context.Background(), time.Millisecond*1, blockTimeout)
165166
require.NoError(t, err)
166167
require.NotNil(t, task)
167168
require.Equal(t, task, recoveredTask)
@@ -178,17 +179,17 @@ func Test_TaskQueue(t *testing.T) {
178179
q2, _ := New[any](client, "test")
179180
require.NoError(t, err)
180181

181-
task, err := q2.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
182+
task, err := q2.Dequeue(context.Background(), lockTimeout, blockTimeout)
182183
require.NoError(t, err)
183184
require.NotNil(t, task)
184185
require.Equal(t, "t1", task.ID)
185186

186-
time.Sleep(time.Millisecond * 10)
187+
time.Sleep(time.Millisecond * 5)
187188

188189
err = q2.Extend(context.Background(), task.TaskID)
189190
require.NoError(t, err)
190191

191-
recoveredTask, err := q.Dequeue(context.Background(), lockTimeout, time.Millisecond*10)
192+
recoveredTask, err := q.Dequeue(context.Background(), lockTimeout*2, blockTimeout)
192193
require.NoError(t, err)
193194
require.Nil(t, recoveredTask)
194195
},

backend/sqlite/events.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func scanEvent(row Scanner) (history.Event, error) {
6161

6262
historyEvent := history.Event{}
6363

64-
if err := row.Scan(&historyEvent.ID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
64+
if err := row.Scan(&historyEvent.ID, &historyEvent.SequenceID, &instanceID, &historyEvent.Type, &historyEvent.Timestamp, &historyEvent.ScheduleEventID, &attributes, &historyEvent.VisibleAt); err != nil {
6565
return historyEvent, errors.Wrap(err, "could not scan event")
6666
}
6767

@@ -92,8 +92,8 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
9292
}
9393
batchEvents := events[batchStart:batchEnd]
9494

95-
query := "INSERT INTO `" + tableName + "` (id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)" +
96-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
95+
query := "INSERT INTO `" + tableName + "` (id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
96+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
9797

9898
args := make([]interface{}, 0, len(batchEvents)*7)
9999

@@ -103,7 +103,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID
103103
return err
104104
}
105105

106-
args = append(args, newEvent.ID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
106+
args = append(args, newEvent.ID, newEvent.SequenceID, instanceID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
107107
}
108108

109109
_, err := tx.ExecContext(

0 commit comments

Comments
 (0)