4
4
"context"
5
5
"database/sql"
6
6
_ "embed"
7
+ "encoding/json"
7
8
"errors"
8
9
"fmt"
9
10
"strings"
@@ -17,6 +18,7 @@ import (
17
18
"github.com/cschleiden/go-workflows/workflow"
18
19
_ "github.com/go-sql-driver/mysql"
19
20
"github.com/google/uuid"
21
+ "go.opentelemetry.io/otel/trace"
20
22
)
21
23
22
24
//go:embed schema.sql
@@ -58,7 +60,7 @@ type mysqlBackend struct {
58
60
}
59
61
60
62
// CreateWorkflowInstance creates a new workflow instance
61
- func (b * mysqlBackend ) CreateWorkflowInstance (ctx context.Context , m history.WorkflowEvent ) error {
63
+ func (b * mysqlBackend ) CreateWorkflowInstance (ctx context.Context , instance * workflow. Instance , event history.Event ) error {
62
64
tx , err := b .db .BeginTx (ctx , & sql.TxOptions {
63
65
Isolation : sql .LevelReadCommitted ,
64
66
})
@@ -68,12 +70,12 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
68
70
defer tx .Rollback ()
69
71
70
72
// Create workflow instance
71
- if err := createInstance (ctx , tx , m . WorkflowInstance , false ); err != nil {
73
+ if err := createInstance (ctx , tx , instance , event . Attributes .( * history. ExecutionStartedAttributes ). Metadata , false ); err != nil {
72
74
return err
73
75
}
74
76
75
77
// Initial history is empty, store only new events
76
- if err := insertPendingEvents (ctx , tx , m . WorkflowInstance . InstanceID , []history.Event {m . HistoryEvent }); err != nil {
78
+ if err := insertPendingEvents (ctx , tx , instance . InstanceID , []history.Event {event }); err != nil {
77
79
return fmt .Errorf ("inserting new event: %w" , err )
78
80
}
79
81
@@ -88,6 +90,10 @@ func (b *mysqlBackend) Logger() log.Logger {
88
90
return b .options .Logger
89
91
}
90
92
93
+ func (b * mysqlBackend ) Tracer () trace.Tracer {
94
+ return b .options .TracerProvider .Tracer (backend .TracerName )
95
+ }
96
+
91
97
func (b * mysqlBackend ) CancelWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
92
98
tx , err := b .db .BeginTx (ctx , & sql.TxOptions {
93
99
Isolation : sql .LevelReadCommitted ,
@@ -199,7 +205,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
199
205
return backend .WorkflowStateActive , nil
200
206
}
201
207
202
- func createInstance (ctx context.Context , tx * sql.Tx , wfi * workflow.Instance , ignoreDuplicate bool ) error {
208
+ func createInstance (ctx context.Context , tx * sql.Tx , wfi * workflow.Instance , metadata * workflow. Metadata , ignoreDuplicate bool ) error {
203
209
var parentInstanceID * string
204
210
var parentEventID * int64
205
211
if wfi .SubWorkflow () {
@@ -210,13 +216,19 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
210
216
parentEventID = & n
211
217
}
212
218
219
+ metadataJson , err := json .Marshal (metadata )
220
+ if err != nil {
221
+ return fmt .Errorf ("marshaling metadata: %w" , err )
222
+ }
223
+
213
224
res , err := tx .ExecContext (
214
225
ctx ,
215
- "INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id) VALUES (?, ?, ?, ?)" ,
226
+ "INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id, metadata ) VALUES (?, ?, ?, ?, ?)" ,
216
227
wfi .InstanceID ,
217
228
wfi .ExecutionID ,
218
229
parentInstanceID ,
219
230
parentEventID ,
231
+ string (metadataJson ),
220
232
)
221
233
if err != nil {
222
234
return fmt .Errorf ("inserting workflow instance: %w" , err )
@@ -273,7 +285,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
273
285
now := time .Now ()
274
286
row := tx .QueryRowContext (
275
287
ctx ,
276
- `SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.sticky_until
288
+ `SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i. sticky_until
277
289
FROM instances i
278
290
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
279
291
WHERE
@@ -293,8 +305,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
293
305
var instanceID , executionID string
294
306
var parentInstanceID * string
295
307
var parentEventID * int64
308
+ var metadataJson sql.NullString
296
309
var stickyUntil * time.Time
297
- if err := row .Scan (& id , & instanceID , & executionID , & parentInstanceID , & parentEventID , & stickyUntil ); err != nil {
310
+ if err := row .Scan (& id , & instanceID , & executionID , & parentInstanceID , & parentEventID , & metadataJson , & stickyUntil ); err != nil {
298
311
if err == sql .ErrNoRows {
299
312
return nil , nil
300
313
}
@@ -329,9 +342,17 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
329
342
wfi = core .NewWorkflowInstance (instanceID , executionID )
330
343
}
331
344
345
+ var metadata * core.WorkflowMetadata
346
+ if metadataJson .Valid {
347
+ if err := json .Unmarshal ([]byte (metadataJson .String ), & metadata ); err != nil {
348
+ return nil , fmt .Errorf ("parsing workflow metadata: %w" , err )
349
+ }
350
+ }
351
+
332
352
t := & task.Workflow {
333
353
ID : wfi .InstanceID ,
334
354
WorkflowInstance : wfi ,
355
+ Metadata : metadata ,
335
356
NewEvents : []history.Event {},
336
357
}
337
358
@@ -489,20 +510,18 @@ func (b *mysqlBackend) CompleteWorkflowTask(
489
510
}
490
511
491
512
// Insert new workflow events
492
- groupedEvents := make (map [* workflow.Instance ][]history.Event )
493
- for _ , m := range workflowEvents {
494
- if _ , ok := groupedEvents [m .WorkflowInstance ]; ! ok {
495
- groupedEvents [m .WorkflowInstance ] = []history.Event {}
496
- }
497
-
498
- groupedEvents [m .WorkflowInstance ] = append (groupedEvents [m .WorkflowInstance ], m .HistoryEvent )
499
- }
513
+ groupedEvents := history .EventsByWorkflowInstance (workflowEvents )
500
514
501
515
for targetInstance , events := range groupedEvents {
502
- if targetInstance .InstanceID != instance .InstanceID {
503
- // Create new instance
504
- if err := createInstance (ctx , tx , targetInstance , true ); err != nil {
505
- return err
516
+ for _ , event := range events {
517
+ if event .Type == history .EventType_WorkflowExecutionStarted {
518
+ a := event .Attributes .(* history.ExecutionStartedAttributes )
519
+ // Create new instance
520
+ if err := createInstance (ctx , tx , targetInstance , a .Metadata , true ); err != nil {
521
+ return err
522
+ }
523
+
524
+ break
506
525
}
507
526
}
508
527
@@ -561,9 +580,11 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
561
580
now := time .Now ()
562
581
res := tx .QueryRowContext (
563
582
ctx ,
564
- `SELECT id, activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at
583
+ `SELECT activities.id, activity_id, activities.instance_id, activities.execution_id,
584
+ instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
565
585
FROM activities
566
- WHERE locked_until IS NULL OR locked_until < ?
586
+ INNER JOIN instances ON activities.instance_id = instances.instance_id
587
+ WHERE activities.locked_until IS NULL OR activities.locked_until < ?
567
588
LIMIT 1
568
589
FOR UPDATE SKIP LOCKED` ,
569
590
now ,
@@ -572,16 +593,24 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
572
593
var id int64
573
594
var instanceID , executionID string
574
595
var attributes []byte
596
+ var metadataJson sql.NullString
575
597
event := history.Event {}
576
598
577
- if err := res .Scan (& id , & event .ID , & instanceID , & executionID , & event .Type , & event .Timestamp , & event .ScheduleEventID , & attributes , & event .VisibleAt ); err != nil {
599
+ if err := res .Scan (
600
+ & id , & event .ID , & instanceID , & executionID , & metadataJson , & event .Type ,
601
+ & event .Timestamp , & event .ScheduleEventID , & attributes , & event .VisibleAt ); err != nil {
578
602
if err == sql .ErrNoRows {
579
603
return nil , nil
580
604
}
581
605
582
606
return nil , fmt .Errorf ("finding activity task to lock: %w" , err )
583
607
}
584
608
609
+ var metadata * workflow.Metadata
610
+ if err := json .Unmarshal ([]byte (metadataJson .String ), & metadata ); err != nil {
611
+ return nil , fmt .Errorf ("unmarshaling metadata: %w" , err )
612
+ }
613
+
585
614
a , err := history .DeserializeAttributes (event .Type , attributes )
586
615
if err != nil {
587
616
return nil , fmt .Errorf ("deserializing attributes: %w" , err )
@@ -602,6 +631,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
602
631
t := & task.Activity {
603
632
ID : event .ID ,
604
633
WorkflowInstance : core .NewWorkflowInstance (instanceID , executionID ),
634
+ Metadata : metadata ,
605
635
Event : event ,
606
636
}
607
637
0 commit comments