@@ -11,6 +11,7 @@ import (
11
11
"time"
12
12
13
13
"github.com/cschleiden/go-workflows/backend"
14
+ "github.com/cschleiden/go-workflows/internal/contextpropagation"
14
15
"github.com/cschleiden/go-workflows/internal/converter"
15
16
"github.com/cschleiden/go-workflows/internal/core"
16
17
"github.com/cschleiden/go-workflows/internal/history"
@@ -78,6 +79,10 @@ func (b *mysqlBackend) Converter() converter.Converter {
78
79
return b .options .Converter
79
80
}
80
81
82
+ func (b * mysqlBackend ) ContextPropagators () []contextpropagation.ContextPropagator {
83
+ return b .options .ContextPropagators
84
+ }
85
+
81
86
func (b * mysqlBackend ) CreateWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
82
87
tx , err := b .db .BeginTx (ctx , & sql.TxOptions {
83
88
Isolation : sql .LevelReadCommitted ,
@@ -626,9 +631,8 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
626
631
res := tx .QueryRowContext (
627
632
ctx ,
628
633
`SELECT activities.id, activity_id, activities.instance_id,
629
- instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
634
+ event_type, timestamp, schedule_event_id, attributes, visible_at
630
635
FROM activities
631
- INNER JOIN instances ON activities.instance_id = instances.instance_id
632
636
WHERE activities.locked_until IS NULL OR activities.locked_until < ?
633
637
LIMIT 1
634
638
FOR UPDATE SKIP LOCKED` ,
@@ -638,11 +642,10 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
638
642
var id int64
639
643
var instanceID string
640
644
var attributes []byte
641
- var metadataJson sql.NullString
642
645
event := & history.Event {}
643
646
644
647
if err := res .Scan (
645
- & id , & event .ID , & instanceID , & metadataJson , & event .Type ,
648
+ & id , & event .ID , & instanceID , & event .Type ,
646
649
& event .Timestamp , & event .ScheduleEventID , & attributes , & event .VisibleAt ); err != nil {
647
650
if err == sql .ErrNoRows {
648
651
return nil , nil
@@ -651,11 +654,6 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
651
654
return nil , fmt .Errorf ("finding activity task to lock: %w" , err )
652
655
}
653
656
654
- var metadata * workflow.Metadata
655
- if err := json .Unmarshal ([]byte (metadataJson .String ), & metadata ); err != nil {
656
- return nil , fmt .Errorf ("unmarshaling metadata: %w" , err )
657
- }
658
-
659
657
a , err := history .DeserializeAttributes (event .Type , attributes )
660
658
if err != nil {
661
659
return nil , fmt .Errorf ("deserializing attributes: %w" , err )
@@ -676,7 +674,6 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
676
674
t := & task.Activity {
677
675
ID : event .ID ,
678
676
WorkflowInstance : core .NewWorkflowInstance (instanceID ),
679
- Metadata : metadata ,
680
677
Event : event ,
681
678
}
682
679
0 commit comments