Skip to content

Commit e57502c

Browse files
authored
Merge pull request #275 from cschleiden/mysql-attributes-table
Move mysql attributes to separate table
2 parents 12503c2 + e137aec commit e57502c

File tree

7 files changed

+84
-30
lines changed

7 files changed

+84
-30
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
ALTER TABLE `activities` ADD COLUMN `attributes` MEDIUMBLOB NULL;
2+
UPDATE `activities` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `activities`.`event_id` = `attributes`.`event_id` AND `activities`.`instance_id` = `attributes`.`instance_id` AND `activities`.`execution_id` = `attributes`.`execution_id`;
3+
ALTER TABLE `activities` MODIFY COLUMN `attributes` MEDIUMBLOB NOT NULL;
4+
5+
ALTER TABLE `history` ADD COLUMN `attributes` MEDIUMBLOB NULL;
6+
UPDATE `history` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `history`.`event_id` = `attributes`.`event_id` AND `history`.`instance_id` = `attributes`.`instance_id` AND `history`.`execution_id` = `attributes`.`execution_id`;
7+
ALTER TABLE `history` MODIFY COLUMN `attributes` MEDIUMBLOB NOT NULL;
8+
9+
ALTER TABLE `pending_events` ADD COLUMN `attributes` MEDIUMBLOB NULL;
10+
UPDATE `pending_events` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `pending_events`.`event_id` = `attributes`.`event_id` AND `pending_events`.`instance_id` = `attributes`.`instance_id` AND `pending_events`.`execution_id` = `attributes`.`execution_id`;
11+
ALTER TABLE `pending_events` MODIFY COLUMN `attributes` MEDIUMBLOB NOT NULL;
12+
13+
14+
-- Drop attributes table
15+
DROP TABLE `attributes`;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
CREATE TABLE IF NOT EXISTS `attributes` (
2+
`id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
3+
`event_id` NVARCHAR(128) NOT NULL,
4+
`instance_id` NVARCHAR(128) NOT NULL,
5+
`execution_id` NVARCHAR(128) NOT NULL,
6+
`data` MEDIUMBLOB NOT NULL,
7+
8+
UNIQUE INDEX `idx_attributes_instance_id_execution_id_event_id` (`instance_id`, `execution_id`, `event_id`),
9+
INDEX `idx_attributes_event_id` (`event_id`)
10+
);
11+
12+
-- Move activity attributes to attributes table
13+
INSERT IGNORE INTO `attributes` (`event_id`, `instance_id`, `execution_id`, `data`) SELECT `activity_id`, `instance_id`, `execution_id`, `attributes` FROM `activities`;
14+
ALTER TABLE `activities` DROP COLUMN `attributes`;
15+
16+
-- Move history attributes to attributes table
17+
INSERT IGNORE INTO `attributes` (`event_id`, `instance_id`, `execution_id`, `data`) SELECT `event_id`, `instance_id`, `execution_id`, `attributes` FROM `history`;
18+
ALTER TABLE `history` DROP COLUMN `attributes`;
19+
20+
-- Move pending_events attributes to attributes table
21+
INSERT IGNORE INTO `attributes` (`event_id`, `instance_id`, `execution_id`, `data`) SELECT `event_id`, `instance_id`, `execution_id`, `attributes` FROM `pending_events`;
22+
ALTER TABLE `pending_events` DROP COLUMN `attributes`;

backend/mysql/events.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package mysql
33
import (
44
"context"
55
"database/sql"
6+
"fmt"
67
"strings"
78

89
"github.com/cschleiden/go-workflows/backend/history"
@@ -26,21 +27,34 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *c
2627
}
2728
batchEvents := events[batchStart:batchEnd]
2829

30+
aquery := "INSERT IGNORE INTO `attributes` (event_id, instance_id, execution_id, data) VALUES (?, ?, ?, ?)" + strings.Repeat(", (?, ?, ?, ?)", len(batchEvents)-1)
31+
aargs := make([]interface{}, 0, len(batchEvents)*4)
32+
2933
query := "INSERT INTO `" + tableName +
30-
"` (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
31-
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
34+
"` (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
35+
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1)
3236

33-
args := make([]interface{}, 0, len(batchEvents)*7)
37+
args := make([]interface{}, 0, len(batchEvents)*8)
3438

3539
for _, newEvent := range batchEvents {
3640
a, err := history.SerializeAttributes(newEvent.Attributes)
3741
if err != nil {
3842
return err
3943
}
4044

45+
aargs = append(aargs, newEvent.ID, instance.InstanceID, instance.ExecutionID, a)
46+
4147
args = append(
4248
args,
43-
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, a, newEvent.VisibleAt)
49+
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, newEvent.VisibleAt)
50+
}
51+
52+
if _, err := tx.ExecContext(
53+
ctx,
54+
aquery,
55+
aargs...,
56+
); err != nil {
57+
return fmt.Errorf("inserting attributes: %w", err)
4458
}
4559

4660
_, err := tx.ExecContext(
@@ -49,7 +63,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *c
4963
args...,
5064
)
5165
if err != nil {
52-
return err
66+
return fmt.Errorf("inserting events: %w", err)
5367
}
5468
}
5569

@@ -59,7 +73,7 @@ func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *c
5973
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error {
6074
_, err := tx.ExecContext(
6175
ctx,
62-
"DELETE FROM `pending_events` WHERE instance_id = ? AND execution_id = ? AND schedule_event_id = ? AND visible_at IS NOT NULL",
76+
"DELETE `pending_events`, `attributes` FROM `pending_events` INNER JOIN `attributes` ON `pending_events`.event_id = `attributes`.event_id WHERE `pending_events`.instance_id = ? AND `pending_events`.execution_id = ? AND `pending_events`.schedule_event_id = ? AND `pending_events`.visible_at IS NOT NULL",
6377
instance.InstanceID,
6478
instance.ExecutionID,
6579
scheduleEventID,

backend/mysql/mysql.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
190190
return err
191191
}
192192

193+
if _, err := tx.ExecContext(ctx, "DELETE FROM `attributes` WHERE instance_id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID); err != nil {
194+
return err
195+
}
196+
193197
return tx.Commit()
194198
}
195199

@@ -231,15 +235,15 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
231235
if lastSequenceID != nil {
232236
historyEvents, err = tx.QueryContext(
233237
ctx,
234-
"SELECT event_id, sequence_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? AND execution_id = ? AND sequence_id > ? ORDER BY sequence_id",
238+
"SELECT h.event_id, h.sequence_id, h.event_type, h.timestamp, h.schedule_event_id, a.data, h.visible_at FROM `history` h JOIN `attributes` a ON h.event_id = a.event_id AND a.instance_id = h.instance_id AND a.execution_id = h.execution_id WHERE h.instance_id = ? AND h.execution_id = ? AND h.sequence_id > ? ORDER BY h.sequence_id",
235239
instance.InstanceID,
236240
instance.ExecutionID,
237241
*lastSequenceID,
238242
)
239243
} else {
240244
historyEvents, err = tx.QueryContext(
241245
ctx,
242-
"SELECT event_id, sequence_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? AND execution_id = ? ORDER BY sequence_id",
246+
"SELECT h.event_id, h.sequence_id, h.event_type, h.timestamp, h.schedule_event_id, a.data, h.visible_at FROM `history` h JOIN `attributes` a ON h.event_id = a.event_id AND a.instance_id = h.instance_id AND a.execution_id = h.execution_id WHERE h.instance_id = ? AND h.execution_id = ? ORDER BY h.sequence_id",
243247
instance.InstanceID,
244248
instance.ExecutionID,
245249
)
@@ -459,7 +463,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTa
459463
// Get new events
460464
events, err := tx.QueryContext(
461465
ctx,
462-
"SELECT event_id, sequence_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE instance_id = ? AND execution_id = ? AND (`visible_at` IS NULL OR `visible_at` <= ?) ORDER BY id",
466+
"SELECT pe.event_id, pe.sequence_id, pe.event_type, pe.timestamp, pe.schedule_event_id, a.data, pe.visible_at FROM `pending_events` pe LEFT JOIN `attributes` a ON pe.instance_id = a.instance_id AND pe.execution_id = a.execution_id AND pe.event_id = a.event_id WHERE pe.instance_id = ? AND pe.execution_id = ? AND (pe.visible_at IS NULL OR pe.visible_at <= ?) ORDER BY pe.id",
463467
instanceID,
464468
executionID,
465469
now,
@@ -503,15 +507,20 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTa
503507
}
504508

505509
// Get most recent sequence id
506-
row = tx.QueryRowContext(ctx, "SELECT sequence_id FROM `history` WHERE instance_id = ? AND execution_id = ? ORDER BY id DESC LIMIT 1", instanceID, executionID)
510+
var lastSequenceID sql.NullInt64
511+
row = tx.QueryRowContext(ctx, "SELECT MAX(sequence_id) FROM `history` WHERE instance_id = ? AND execution_id = ?", instanceID, executionID)
507512
if err := row.Scan(
508-
&t.LastSequenceID,
513+
&lastSequenceID,
509514
); err != nil {
510515
if err != sql.ErrNoRows {
511516
return nil, fmt.Errorf("getting most recent sequence id: %w", err)
512517
}
513518
}
514519

520+
if lastSequenceID.Valid {
521+
t.LastSequenceID = lastSequenceID.Int64
522+
}
523+
515524
if err := tx.Commit(); err != nil {
516525
return nil, err
517526
}
@@ -687,10 +696,11 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTa
687696
now := time.Now()
688697
res := tx.QueryRowContext(
689698
ctx,
690-
`SELECT activities.id, activity_id, activities.instance_id, activities.execution_id,
691-
event_type, timestamp, schedule_event_id, attributes, visible_at
692-
FROM activities
693-
WHERE activities.locked_until IS NULL OR activities.locked_until < ?
699+
`SELECT a.id, a.activity_id, a.instance_id, a.execution_id,
700+
a.event_type, a.timestamp, a.schedule_event_id, at.data, a.visible_at
701+
FROM activities a
702+
JOIN attributes at ON at.event_id = a.activity_id AND at.instance_id = a.instance_id AND at.execution_id = a.execution_id
703+
WHERE a.locked_until IS NULL OR a.locked_until < ?
694704
LIMIT 1
695705
FOR UPDATE SKIP LOCKED`,
696706
now,
@@ -815,22 +825,18 @@ func (b *mysqlBackend) ExtendActivityTask(ctx context.Context, activityID string
815825
}
816826

817827
func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event *history.Event) error {
818-
a, err := history.SerializeAttributes(event.Attributes)
819-
if err != nil {
820-
return err
821-
}
828+
// Attributes are already persisted via the history, we do not need to add them again.
822829

823-
_, err = tx.ExecContext(
830+
_, err := tx.ExecContext(
824831
ctx,
825832
`INSERT INTO activities
826-
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
833+
(activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
827834
event.ID,
828835
instance.InstanceID,
829836
instance.ExecutionID,
830837
event.Type,
831838
event.Timestamp,
832839
event.ScheduleEventID,
833-
a,
834840
event.VisibleAt,
835841
)
836842

backend/mysql/mysql_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]*history.Event,
121121
// There is no index on `visible_at`, but this is okay for test only usage.
122122
futureEvents, err := tx.QueryContext(
123123
ctx,
124-
"SELECT event_id, sequence_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
124+
"SELECT pe.id, pe.sequence_id, pe.instance_id, pe.execution_id, pe.event_type, pe.timestamp, pe.schedule_event_id, pe.visible_at, a.data FROM `pending_events` pe JOIN `attributes` a ON a.id = pe.id AND a.instance_id = pe.instance_id AND a.execution_id = pe.execution_id WHERE pe.visible_at IS NOT NULL",
125125
)
126126
if err != nil {
127127
return nil, fmt.Errorf("getting history: %w", err)

backend/sqlite/db/migrations/000002_add_attributes_table.down.sql

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
-- Move activity attributes to attributes table
21
ALTER TABLE `activities` ADD COLUMN `attributes` BLOB NULL;
32
UPDATE `activities` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `activities`.`id` = `attributes`.`id` AND `activities`.`instance_id` = `attributes`.`instance_id` AND `activities`.`execution_id` = `attributes`.`execution_id`;
43

5-
-- Move history attributes to attributes table
64
ALTER TABLE `history` ADD COLUMN `attributes` BLOB NULL;
75
UPDATE `history` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `history`.`id` = `attributes`.`id` AND `history`.`instance_id` = `attributes`.`instance_id` AND `history`.`execution_id` = `attributes`.`execution_id`;
86

9-
-- Move pending_events attributes to attributes table
107
ALTER TABLE `pending_events` ADD COLUMN `attributes` BLOB NULL;
118
UPDATE `pending_events` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `pending_events`.`id` = `attributes`.`id` AND `pending_events`.`instance_id` = `attributes`.`instance_id` AND `pending_events`.`execution_id` = `attributes`.`execution_id`;
129

backend/sqlite/db/migrations/000002_add_attributes_table.up.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ CREATE TABLE IF NOT EXISTS `attributes` (
33
`instance_id` TEXT NOT NULL,
44
`execution_id` TEXT NOT NULL,
55
`data` BLOB NOT NULL,
6-
PRIMARY KEY(`id`, `instance_id`)
6+
PRIMARY KEY(`id`, `instance_id`, `execution_id`)
77
);
88

9-
-- Move activity attributes to payloads table
9+
-- Move activity attributes to attributes table
1010
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `activities`;
1111
ALTER TABLE `activities` DROP COLUMN `attributes`;
1212

13-
-- Move history attributes to payloads table
13+
-- Move history attributes to attributes table
1414
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `history`;
1515
ALTER TABLE `history` DROP COLUMN `attributes`;
1616

17-
-- Move pending_events attributes to payloads table
17+
-- Move pending_events attributes to attributes table
1818
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `pending_events`;
1919
ALTER TABLE `pending_events` DROP COLUMN `attributes`;

0 commit comments

Comments
 (0)