Skip to content

Commit 3d34b15

Browse files
authored
Merge pull request #206 from cschleiden/context-propagators
Support context propagation
2 parents 6c7bcc3 + 05e28b7 commit 3d34b15

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+965
-187
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,22 @@ func Workflow(ctx workflow.Context) error {
632632
span.End()
633633
```
634634
635+
### Context Propagation
636+
637+
In go programs it is common to use `context.Context` to pass around request-scoped data. This library supports context propagation between activities and workflows. When you create a workflow, you can pass a `ContextPropagator` to the backend to propagate context values. The interface is:
638+
639+
```go
640+
type ContextPropagator interface {
641+
Inject(context.Context, *Metadata) error
642+
Extract(context.Context, *Metadata) (context.Context, error)
643+
644+
InjectFromWorkflow(Context, *Metadata) error
645+
ExtractToWorkflow(Context, *Metadata) (Context, error)
646+
}
647+
```
648+
649+
The `context-propagation` sample shows an example of how to use this.
650+
635651
## Tools
636652
637653
### Analyzer

backend/backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66

7+
"github.com/cschleiden/go-workflows/internal/contextpropagation"
78
"github.com/cschleiden/go-workflows/internal/converter"
89
core "github.com/cschleiden/go-workflows/internal/core"
910
"github.com/cschleiden/go-workflows/internal/history"
@@ -78,4 +79,7 @@ type Backend interface {
7879

7980
// Converter returns the configured converter for the backend
8081
Converter() converter.Converter
82+
83+
// ContextPropagators returns the configured context propagators for the backend
84+
ContextPropagators() []contextpropagation.ContextPropagator
8185
}

backend/mock_Backend.go

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/cschleiden/go-workflows/backend"
14+
"github.com/cschleiden/go-workflows/internal/contextpropagation"
1415
"github.com/cschleiden/go-workflows/internal/converter"
1516
"github.com/cschleiden/go-workflows/internal/core"
1617
"github.com/cschleiden/go-workflows/internal/history"
@@ -78,6 +79,10 @@ func (b *mysqlBackend) Converter() converter.Converter {
7879
return b.options.Converter
7980
}
8081

82+
func (b *mysqlBackend) ContextPropagators() []contextpropagation.ContextPropagator {
83+
return b.options.ContextPropagators
84+
}
85+
8186
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
8287
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
8388
Isolation: sql.LevelReadCommitted,
@@ -626,9 +631,8 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
626631
res := tx.QueryRowContext(
627632
ctx,
628633
`SELECT activities.id, activity_id, activities.instance_id,
629-
instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
634+
event_type, timestamp, schedule_event_id, attributes, visible_at
630635
FROM activities
631-
INNER JOIN instances ON activities.instance_id = instances.instance_id
632636
WHERE activities.locked_until IS NULL OR activities.locked_until < ?
633637
LIMIT 1
634638
FOR UPDATE SKIP LOCKED`,
@@ -638,11 +642,10 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
638642
var id int64
639643
var instanceID string
640644
var attributes []byte
641-
var metadataJson sql.NullString
642645
event := &history.Event{}
643646

644647
if err := res.Scan(
645-
&id, &event.ID, &instanceID, &metadataJson, &event.Type,
648+
&id, &event.ID, &instanceID, &event.Type,
646649
&event.Timestamp, &event.ScheduleEventID, &attributes, &event.VisibleAt); err != nil {
647650
if err == sql.ErrNoRows {
648651
return nil, nil
@@ -651,11 +654,6 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
651654
return nil, fmt.Errorf("finding activity task to lock: %w", err)
652655
}
653656

654-
var metadata *workflow.Metadata
655-
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
656-
return nil, fmt.Errorf("unmarshaling metadata: %w", err)
657-
}
658-
659657
a, err := history.DeserializeAttributes(event.Type, attributes)
660658
if err != nil {
661659
return nil, fmt.Errorf("deserializing attributes: %w", err)
@@ -676,7 +674,6 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
676674
t := &task.Activity{
677675
ID: event.ID,
678676
WorkflowInstance: core.NewWorkflowInstance(instanceID),
679-
Metadata: metadata,
680677
Event: event,
681678
}
682679

backend/mysql/mysql_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func Test_MysqlBackend(t *testing.T) {
2626

2727
var dbName string
2828

29-
test.BackendTest(t, func() test.TestBackend {
29+
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
3030
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
3131
if err != nil {
3232
panic(err)
@@ -41,7 +41,9 @@ func Test_MysqlBackend(t *testing.T) {
4141
panic(err)
4242
}
4343

44-
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, backend.WithStickyTimeout(0))
44+
options = append(options, backend.WithStickyTimeout(0))
45+
46+
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, options...)
4547
}, func(b test.TestBackend) {
4648
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
4749
if err != nil {
@@ -65,7 +67,7 @@ func TestMySqlBackendE2E(t *testing.T) {
6567

6668
var dbName string
6769

68-
test.EndToEndBackendTest(t, func() test.TestBackend {
70+
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
6971
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
7072
if err != nil {
7173
panic(err)
@@ -80,7 +82,9 @@ func TestMySqlBackendE2E(t *testing.T) {
8082
panic(err)
8183
}
8284

83-
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, backend.WithStickyTimeout(0))
85+
options = append(options, backend.WithStickyTimeout(0))
86+
87+
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, options...)
8488
}, func(b test.TestBackend) {
8589
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
8690
if err != nil {

backend/options.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package backend
33
import (
44
"time"
55

6+
"github.com/cschleiden/go-workflows/internal/contextpropagation"
67
"github.com/cschleiden/go-workflows/internal/converter"
78
"github.com/cschleiden/go-workflows/internal/logger"
89
mi "github.com/cschleiden/go-workflows/internal/metrics"
10+
"github.com/cschleiden/go-workflows/internal/tracing"
911
"github.com/cschleiden/go-workflows/log"
1012
"github.com/cschleiden/go-workflows/metrics"
13+
"github.com/cschleiden/go-workflows/workflow"
1114
"go.opentelemetry.io/otel/trace"
1215
)
1316

@@ -22,6 +25,9 @@ type Options struct {
2225
// converter.DefaultConverter is used.
2326
Converter converter.Converter
2427

28+
// ContextPropagators is a list of context propagators to use for passing context into workflows and activities.
29+
ContextPropagators []contextpropagation.ContextPropagator
30+
2531
StickyTimeout time.Duration
2632

2733
// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
@@ -44,6 +50,8 @@ var DefaultOptions Options = Options{
4450
Metrics: mi.NewNoopMetricsClient(),
4551
TracerProvider: trace.NewNoopTracerProvider(),
4652
Converter: converter.DefaultConverter,
53+
54+
ContextPropagators: []contextpropagation.ContextPropagator{&tracing.TracingContextPropagator{}},
4755
}
4856

4957
type BackendOption func(*Options)
@@ -78,6 +86,12 @@ func WithConverter(converter converter.Converter) BackendOption {
7886
}
7987
}
8088

89+
func WithContextPropagator(prop workflow.ContextPropagator) BackendOption {
90+
return func(o *Options) {
91+
o.ContextPropagators = append(o.ContextPropagators, prop)
92+
}
93+
}
94+
8195
func ApplyOptions(opts ...BackendOption) Options {
8296
options := DefaultOptions
8397

backend/redis/activity.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package redis
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/cschleiden/go-workflows/internal/core"
87
"github.com/cschleiden/go-workflows/internal/history"
@@ -19,14 +18,8 @@ func (rb *redisBackend) GetActivityTask(ctx context.Context) (*task.Activity, er
1918
return nil, nil
2019
}
2120

22-
instanceState, err := readInstance(ctx, rb.rdb, activityTask.Data.Instance.InstanceID)
23-
if err != nil {
24-
return nil, fmt.Errorf("reading workflow instance for activity task: %w", err)
25-
}
26-
2721
return &task.Activity{
2822
WorkflowInstance: activityTask.Data.Instance,
29-
Metadata: instanceState.Metadata,
3023
ID: activityTask.TaskID, // Use the queue generated ID here
3124
Event: activityTask.Data.Event,
3225
}, nil

backend/redis/redis.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/internal/contextpropagation"
910
"github.com/cschleiden/go-workflows/internal/converter"
1011
"github.com/cschleiden/go-workflows/internal/core"
1112
"github.com/cschleiden/go-workflows/internal/history"
@@ -101,6 +102,10 @@ func (rb *redisBackend) Converter() converter.Converter {
101102
return rb.options.Converter
102103
}
103104

105+
func (rb *redisBackend) ContextPropagators() []contextpropagation.ContextPropagator {
106+
return rb.options.ContextPropagators
107+
}
108+
104109
func (rb *redisBackend) Close() error {
105110
return rb.rdb.Close()
106111
}

backend/redis/redis_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/cschleiden/go-workflows/backend"
1112
"github.com/cschleiden/go-workflows/backend/test"
1213
"github.com/cschleiden/go-workflows/internal/history"
1314
"github.com/redis/go-redis/v9"
@@ -52,8 +53,8 @@ func getClient() redis.UniversalClient {
5253
return client
5354
}
5455

55-
func getCreateBackend(client redis.UniversalClient, additionalOptions ...RedisBackendOption) func() test.TestBackend {
56-
return func() test.TestBackend {
56+
func getCreateBackend(client redis.UniversalClient, additionalOptions ...RedisBackendOption) func(options ...backend.BackendOption) test.TestBackend {
57+
return func(options ...backend.BackendOption) test.TestBackend {
5758
// Flush database
5859
if err := client.FlushDB(context.Background()).Err(); err != nil {
5960
panic(err)
@@ -68,13 +69,14 @@ func getCreateBackend(client redis.UniversalClient, additionalOptions ...RedisBa
6869
panic("Keys should've been empty" + strings.Join(r, ", "))
6970
}
7071

71-
options := []RedisBackendOption{
72+
redisOptions := []RedisBackendOption{
7273
WithBlockTimeout(time.Millisecond * 10),
74+
WithBackendOptions(options...),
7375
}
7476

75-
options = append(options, additionalOptions...)
77+
redisOptions = append(redisOptions, additionalOptions...)
7678

77-
b, err := NewRedisBackend(client, options...)
79+
b, err := NewRedisBackend(client, redisOptions...)
7880
if err != nil {
7981
panic(err)
8082
}

backend/redis/signal.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
1818
return err
1919
}
2020

21-
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
21+
ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata)
22+
if err != nil {
23+
rb.Logger().Error("extracting tracing context", log.ErrorKey, err)
24+
}
25+
2226
a := event.Attributes.(*history.SignalReceivedAttributes)
23-
_, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
27+
ctx, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
2428
attribute.String(log.InstanceIDKey, instanceID),
2529
attribute.String(log.SignalNameKey, event.Attributes.(*history.SignalReceivedAttributes).Name),
2630
))

0 commit comments

Comments
 (0)