Skip to content

Commit 4fee3f7

Browse files
authored
Add metadata logic to mysql and sqlite backends
1 parent 644f78e commit 4fee3f7

File tree

8 files changed

+146
-77
lines changed

8 files changed

+146
-77
lines changed

backend/mysql/mysql.go

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
_ "embed"
7+
"encoding/json"
78
"errors"
89
"fmt"
910
"strings"
@@ -69,7 +70,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
6970
defer tx.Rollback()
7071

7172
// Create workflow instance
72-
if err := createInstance(ctx, tx, instance, false); err != nil {
73+
if err := createInstance(ctx, tx, instance, metadata, false); err != nil {
7374
return err
7475
}
7576

@@ -204,7 +205,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
204205
return backend.WorkflowStateActive, nil
205206
}
206207

207-
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
208+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
208209
var parentInstanceID *string
209210
var parentEventID *int64
210211
if wfi.SubWorkflow() {
@@ -215,13 +216,19 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
215216
parentEventID = &n
216217
}
217218

219+
metadataJson, err := json.Marshal(metadata)
220+
if err != nil {
221+
return fmt.Errorf("marshaling metadata: %w", err)
222+
}
223+
218224
res, err := tx.ExecContext(
219225
ctx,
220-
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id) VALUES (?, ?, ?, ?)",
226+
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?, ?)",
221227
wfi.InstanceID,
222228
wfi.ExecutionID,
223229
parentInstanceID,
224230
parentEventID,
231+
string(metadataJson),
225232
)
226233
if err != nil {
227234
return fmt.Errorf("inserting workflow instance: %w", err)
@@ -278,7 +285,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
278285
now := time.Now()
279286
row := tx.QueryRowContext(
280287
ctx,
281-
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.sticky_until
288+
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i.sticky_until
282289
FROM instances i
283290
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
284291
WHERE
@@ -298,8 +305,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
298305
var instanceID, executionID string
299306
var parentInstanceID *string
300307
var parentEventID *int64
308+
var metadataJson sql.NullString
301309
var stickyUntil *time.Time
302-
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &stickyUntil); err != nil {
310+
if err := row.Scan(&id, &instanceID, &executionID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
303311
if err == sql.ErrNoRows {
304312
return nil, nil
305313
}
@@ -334,9 +342,17 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
334342
wfi = core.NewWorkflowInstance(instanceID, executionID)
335343
}
336344

345+
var metadata *core.WorkflowInstanceMetadata
346+
if metadataJson.Valid {
347+
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
348+
return nil, fmt.Errorf("parsing workflow metadata: %w", err)
349+
}
350+
}
351+
337352
t := &task.Workflow{
338353
ID: wfi.InstanceID,
339354
WorkflowInstance: wfi,
355+
Metadata: metadata,
340356
NewEvents: []history.Event{},
341357
}
342358

@@ -494,20 +510,18 @@ func (b *mysqlBackend) CompleteWorkflowTask(
494510
}
495511

496512
// Insert new workflow events
497-
groupedEvents := make(map[*workflow.Instance][]history.Event)
498-
for _, m := range workflowEvents {
499-
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
500-
groupedEvents[m.WorkflowInstance] = []history.Event{}
501-
}
502-
503-
groupedEvents[m.WorkflowInstance] = append(groupedEvents[m.WorkflowInstance], m.HistoryEvent)
504-
}
513+
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
505514

506515
for targetInstance, events := range groupedEvents {
507-
if targetInstance.InstanceID != instance.InstanceID {
508-
// Create new instance
509-
if err := createInstance(ctx, tx, targetInstance, true); err != nil {
510-
return err
516+
for _, event := range events {
517+
if event.Type == history.EventType_WorkflowExecutionStarted {
518+
a := event.Attributes.(*history.ExecutionStartedAttributes)
519+
// Create new instance
520+
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
521+
return err
522+
}
523+
524+
break
511525
}
512526
}
513527

@@ -566,8 +580,9 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
566580
now := time.Now()
567581
res := tx.QueryRowContext(
568582
ctx,
569-
`SELECT id, activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at
583+
`SELECT id, activity_id, instance_id, execution_id, instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
570584
FROM activities
585+
INNER JOIN instances ON activities.instance_id = instances.instance_id
571586
WHERE locked_until IS NULL OR locked_until < ?
572587
LIMIT 1
573588
FOR UPDATE SKIP LOCKED`,
@@ -577,16 +592,24 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
577592
var id int64
578593
var instanceID, executionID string
579594
var attributes []byte
595+
var metadataJson sql.NullString
580596
event := history.Event{}
581597

582-
if err := res.Scan(&id, &event.ID, &instanceID, &executionID, &event.Type, &event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil {
598+
if err := res.Scan(
599+
&id, &event.ID, &instanceID, &executionID, &metadataJson, &event.Type,
600+
&event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil {
583601
if err == sql.ErrNoRows {
584602
return nil, nil
585603
}
586604

587605
return nil, fmt.Errorf("finding activity task to lock: %w", err)
588606
}
589607

608+
var metadata *workflow.Metadata
609+
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
610+
return nil, fmt.Errorf("unmarshaling metadata: %w", err)
611+
}
612+
590613
a, err := history.DeserializeAttributes(event.Type, attributes)
591614
if err != nil {
592615
return nil, fmt.Errorf("deserializing attributes: %w", err)
@@ -607,6 +630,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
607630
t := &task.Activity{
608631
ID: event.ID,
609632
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
633+
WorkflowMetadata: metadata,
610634
Event: event,
611635
}
612636

backend/mysql/schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS `instances` (
44
`execution_id` NVARCHAR(128) NOT NULL,
55
`parent_instance_id` NVARCHAR(128) NULL,
66
`parent_schedule_event_id` BIGINT NULL,
7+
`metadata` BLOB NULL,
78
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
89
`completed_at` DATETIME NULL,
910
`locked_until` DATETIME NULL,

backend/redis/workflow.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
199199
}
200200

201201
// Send new workflow events to the respective streams
202-
groupedEvents := eventsByWorkflowInstance(workflowEvents)
202+
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
203203
for targetInstance, events := range groupedEvents {
204204
// Insert pending events for target instance
205205
for _, event := range events {
@@ -315,17 +315,3 @@ func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.P
315315

316316
return nil
317317
}
318-
319-
func eventsByWorkflowInstance(events []history.WorkflowEvent) map[*core.WorkflowInstance][]history.Event {
320-
groupedEvents := make(map[*core.WorkflowInstance][]history.Event)
321-
322-
for _, m := range events {
323-
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
324-
groupedEvents[m.WorkflowInstance] = []history.Event{}
325-
}
326-
327-
groupedEvents[m.WorkflowInstance] = append(groupedEvents[m.WorkflowInstance], m.HistoryEvent)
328-
}
329-
330-
return groupedEvents
331-
}

backend/sqlite/schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS `instances` (
33
`execution_id` TEXT NO NULL,
44
`parent_instance_id` TEXT NULL,
55
`parent_schedule_event_id` INTEGER NULL,
6+
`metadata` TEXT NULL,
67
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
78
`completed_at` DATETIME NULL,
89
`locked_until` DATETIME NULL,

backend/sqlite/sqlite.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
_ "embed"
7+
"encoding/json"
78
"errors"
89
"fmt"
910
"strings"
@@ -76,7 +77,7 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
7677
defer tx.Rollback()
7778

7879
// Create workflow instance
79-
if err := createInstance(ctx, tx, instance, false); err != nil {
80+
if err := createInstance(ctx, tx, instance, metadata, false); err != nil {
8081
return err
8182
}
8283

@@ -92,7 +93,7 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
9293
return nil
9394
}
9495

95-
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
96+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
9697
var parentInstanceID *string
9798
var parentEventID *int64
9899
if wfi.SubWorkflow() {
@@ -103,13 +104,19 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
103104
parentEventID = &n
104105
}
105106

107+
metadataJson, err := json.Marshal(metadata)
108+
if err != nil {
109+
return fmt.Errorf("marshaling metadata: %w", err)
110+
}
111+
106112
res, err := tx.ExecContext(
107113
ctx,
108-
"INSERT OR IGNORE INTO `instances` (id, execution_id, parent_instance_id, parent_schedule_event_id) VALUES (?, ?, ?, ?)",
114+
"INSERT OR IGNORE INTO `instances` (id, execution_id, parent_instance_id, parent_schedule_event_id, metadata) VALUES (?, ?, ?, ?, ?)",
109115
wfi.InstanceID,
110116
wfi.ExecutionID,
111117
parentInstanceID,
112118
parentEventID,
119+
string(metadataJson),
113120
)
114121
if err != nil {
115122
return fmt.Errorf("inserting workflow instance: %w", err)
@@ -238,7 +245,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
238245
WHERE instance_id = i.id AND execution_id = i.execution_id AND (visible_at IS NULL OR visible_at <= ?)
239246
)
240247
LIMIT 1
241-
) RETURNING id, execution_id, parent_instance_id, parent_schedule_event_id, sticky_until`,
248+
) RETURNING id, execution_id, parent_instance_id, parent_schedule_event_id, metadata, sticky_until`,
242249
now.Add(sb.options.WorkflowLockTimeout), // new locked_until
243250
sb.workerName,
244251
now, // locked_until
@@ -250,8 +257,9 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
250257
var instanceID, executionID string
251258
var parentInstanceID *string
252259
var parentEventID *int64
260+
var metadataJson sql.NullString
253261
var stickyUntil *time.Time
254-
if err := row.Scan(&instanceID, &executionID, &parentInstanceID, &parentEventID, &stickyUntil); err != nil {
262+
if err := row.Scan(&instanceID, &executionID, &parentInstanceID, &parentEventID, &metadataJson, &stickyUntil); err != nil {
255263
if err == sql.ErrNoRows {
256264
return nil, nil
257265
}
@@ -266,9 +274,17 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
266274
wfi = core.NewWorkflowInstance(instanceID, executionID)
267275
}
268276

277+
var metadata *core.WorkflowInstanceMetadata
278+
if metadataJson.Valid {
279+
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
280+
return nil, fmt.Errorf("parsing workflow metadata: %w", err)
281+
}
282+
}
283+
269284
t := &task.Workflow{
270285
ID: wfi.InstanceID,
271286
WorkflowInstance: wfi,
287+
Metadata: metadata,
272288
NewEvents: []history.Event{},
273289
}
274290

@@ -384,20 +400,18 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
384400
}
385401

386402
// Insert new workflow events
387-
groupedEvents := make(map[*workflow.Instance][]history.Event)
388-
for _, m := range workflowEvents {
389-
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
390-
groupedEvents[m.WorkflowInstance] = []history.Event{}
391-
}
392-
393-
groupedEvents[m.WorkflowInstance] = append(groupedEvents[m.WorkflowInstance], m.HistoryEvent)
394-
}
403+
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
395404

396405
for targetInstance, events := range groupedEvents {
397-
if instance.InstanceID != targetInstance.InstanceID {
398-
// Create new instance
399-
if err := createInstance(ctx, tx, targetInstance, true); err != nil {
400-
return err
406+
for _, event := range events {
407+
if event.Type == history.EventType_WorkflowExecutionStarted {
408+
a := event.Attributes.(*history.ExecutionStartedAttributes)
409+
// Create new instance
410+
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
411+
return err
412+
}
413+
414+
break
401415
}
402416
}
403417

@@ -484,9 +498,20 @@ func (sb *sqliteBackend) GetActivityTask(ctx context.Context) (*task.Activity, e
484498

485499
event.Attributes = a
486500

501+
var metadataJson sql.NullString
502+
if err := tx.QueryRowContext(ctx, "SELECT metadata FROM instances WHERE id = ?", instanceID).Scan(&metadataJson); err != nil {
503+
return nil, fmt.Errorf("scanning metadata: %w", err)
504+
}
505+
506+
var metadata *workflow.Metadata
507+
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
508+
return nil, fmt.Errorf("unmarshaling metadata: %w", err)
509+
}
510+
487511
t := &task.Activity{
488512
ID: event.ID,
489513
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
514+
WorkflowMetadata: metadata,
490515
Event: event,
491516
}
492517

backend/test/backendtest.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
8989
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
9090
defer cancel()
9191

92-
task, err := b.GetWorkflowTask(ctx)
93-
require.ErrorIs(t, err, context.DeadlineExceeded)
92+
time.Sleep(1 * time.Millisecond)
93+
94+
task, _ := b.GetWorkflowTask(ctx)
9495
require.Nil(t, task)
9596
},
9697
},

internal/history/grouping.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package history
2+
3+
import "github.com/cschleiden/go-workflows/internal/core"
4+
5+
func EventsByWorkflowInstance(events []WorkflowEvent) map[*core.WorkflowInstance][]Event {
6+
groupedEvents := make(map[*core.WorkflowInstance][]Event)
7+
8+
for _, m := range events {
9+
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
10+
groupedEvents[m.WorkflowInstance] = []Event{}
11+
}
12+
13+
groupedEvents[m.WorkflowInstance] = append(groupedEvents[m.WorkflowInstance], m.HistoryEvent)
14+
}
15+
16+
return groupedEvents
17+
}

0 commit comments

Comments
 (0)