Skip to content

Commit 5d5d6d7

Browse files
authored
Merge pull request #163 from cschleiden/cschleiden/converter
Support passing in a custom converter for inputs/result serialization
2 parents f31dca3 + 15d16f6 commit 5d5d6d7

File tree

27 files changed

+351
-78
lines changed

27 files changed

+351
-78
lines changed

backend/backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66

7+
"github.com/cschleiden/go-workflows/internal/converter"
78
core "github.com/cschleiden/go-workflows/internal/core"
89
"github.com/cschleiden/go-workflows/internal/history"
910
"github.com/cschleiden/go-workflows/internal/task"
@@ -70,4 +71,7 @@ type Backend interface {
7071

7172
// Metrics returns the configured metrics client for the backend
7273
Metrics() metrics.Client
74+
75+
// Converter returns the configured converter for the backend
76+
Converter() converter.Converter
7377
}

backend/mock_Backend.go

Lines changed: 37 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/cschleiden/go-workflows/backend"
14+
"github.com/cschleiden/go-workflows/internal/converter"
1415
"github.com/cschleiden/go-workflows/internal/core"
1516
"github.com/cschleiden/go-workflows/internal/history"
1617
"github.com/cschleiden/go-workflows/internal/metrickeys"
@@ -100,6 +101,10 @@ func (b *mysqlBackend) Metrics() metrics.Client {
100101
return b.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "mysql"})
101102
}
102103

104+
func (b *mysqlBackend) Converter() converter.Converter {
105+
return b.options.Converter
106+
}
107+
103108
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
104109
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
105110
Isolation: sql.LevelReadCommitted,

backend/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package backend
33
import (
44
"time"
55

6+
"github.com/cschleiden/go-workflows/internal/converter"
67
"github.com/cschleiden/go-workflows/internal/logger"
78
mi "github.com/cschleiden/go-workflows/internal/metrics"
89
"github.com/cschleiden/go-workflows/log"
@@ -17,6 +18,10 @@ type Options struct {
1718

1819
TracerProvider trace.TracerProvider
1920

21+
// Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set
22+
// converter.DefaultConverter is used.
23+
Converter converter.Converter
24+
2025
StickyTimeout time.Duration
2126

2227
WorkflowLockTimeout time.Duration
@@ -32,6 +37,7 @@ var DefaultOptions Options = Options{
3237
Logger: logger.NewDefaultLogger(),
3338
Metrics: mi.NewNoopMetricsClient(),
3439
TracerProvider: trace.NewNoopTracerProvider(),
40+
Converter: converter.DefaultConverter,
3541
}
3642

3743
type BackendOption func(*Options)
@@ -60,6 +66,12 @@ func WithTracerProvider(tp trace.TracerProvider) BackendOption {
6066
}
6167
}
6268

69+
func WithConverter(converter converter.Converter) BackendOption {
70+
return func(o *Options) {
71+
o.Converter = converter
72+
}
73+
}
74+
6375
func ApplyOptions(opts ...BackendOption) Options {
6476
options := DefaultOptions
6577

backend/redis/redis.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/internal/converter"
910
"github.com/cschleiden/go-workflows/internal/core"
1011
"github.com/cschleiden/go-workflows/internal/history"
1112
"github.com/cschleiden/go-workflows/internal/metrickeys"
@@ -116,6 +117,10 @@ func (rb *redisBackend) Tracer() trace.Tracer {
116117
return rb.options.TracerProvider.Tracer(backend.TracerName)
117118
}
118119

120+
func (rb *redisBackend) Converter() converter.Converter {
121+
return rb.options.Converter
122+
}
123+
119124
func (rb *redisBackend) Close() error {
120125
return rb.rdb.Close()
121126
}

backend/sqlite/sqlite.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/cschleiden/go-workflows/backend"
14+
"github.com/cschleiden/go-workflows/internal/converter"
1415
"github.com/cschleiden/go-workflows/internal/core"
1516
"github.com/cschleiden/go-workflows/internal/history"
1617
"github.com/cschleiden/go-workflows/internal/metrickeys"
@@ -75,6 +76,10 @@ func (sb *sqliteBackend) Tracer() trace.Tracer {
7576
return sb.options.TracerProvider.Tracer(backend.TracerName)
7677
}
7778

79+
func (sb *sqliteBackend) Converter() converter.Converter {
80+
return sb.options.Converter
81+
}
82+
7883
func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
7984
tx, err := sb.db.BeginTx(ctx, nil)
8085
if err != nil {

client/client.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/cenkalti/backoff/v4"
1111
"github.com/cschleiden/go-workflows/backend"
1212
a "github.com/cschleiden/go-workflows/internal/args"
13-
"github.com/cschleiden/go-workflows/internal/converter"
1413
"github.com/cschleiden/go-workflows/internal/core"
1514
"github.com/cschleiden/go-workflows/internal/fn"
1615
"github.com/cschleiden/go-workflows/internal/history"
@@ -56,7 +55,7 @@ func New(backend backend.Backend) Client {
5655
}
5756

5857
func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowInstanceOptions, wf workflow.Workflow, args ...interface{}) (*workflow.Instance, error) {
59-
inputs, err := a.ArgsToInputs(converter.DefaultConverter, args...)
58+
inputs, err := a.ArgsToInputs(c.backend.Converter(), args...)
6059
if err != nil {
6160
return nil, fmt.Errorf("converting arguments: %w", err)
6261
}
@@ -101,7 +100,7 @@ func (c *client) CancelWorkflowInstance(ctx context.Context, instance *workflow.
101100
}
102101

103102
func (c *client) SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error {
104-
input, err := converter.DefaultConverter.To(arg)
103+
input, err := c.backend.Converter().To(arg)
105104
if err != nil {
106105
return fmt.Errorf("converting arguments: %w", err)
107106
}
@@ -184,7 +183,7 @@ func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.
184183
}
185184

186185
var r T
187-
if err := converter.DefaultConverter.From(a.Result, &r); err != nil {
186+
if err := b.Converter().From(a.Result, &r); err != nil {
188187
return *new(T), fmt.Errorf("converting result: %w", err)
189188
}
190189

client/client_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func Test_Client_GetWorkflowResultSuccess(t *testing.T) {
5858
Error: "",
5959
}),
6060
}, nil)
61+
b.On("Converter").Return(converter.DefaultConverter)
6162

6263
c := &client{
6364
backend: b,
@@ -77,6 +78,7 @@ func Test_Client_SignalWorkflow(t *testing.T) {
7778

7879
b := &backend.MockBackend{}
7980
b.On("Logger").Return(logger.NewDefaultLogger())
81+
b.On("Converter").Return(converter.DefaultConverter)
8082
b.On("SignalWorkflow", ctx, instanceID, mock.MatchedBy(func(event history.Event) bool {
8183
return event.Type == history.EventType_SignalReceived &&
8284
event.Attributes.(*history.SignalReceivedAttributes).Name == "test"
@@ -104,6 +106,7 @@ func Test_Client_SignalWorkflow_WithArgs(t *testing.T) {
104106

105107
b := &backend.MockBackend{}
106108
b.On("Logger").Return(logger.NewDefaultLogger())
109+
b.On("Converter").Return(converter.DefaultConverter)
107110
b.On("SignalWorkflow", ctx, instanceID, mock.MatchedBy(func(event history.Event) bool {
108111
return event.Type == history.EventType_SignalReceived &&
109112
event.Attributes.(*history.SignalReceivedAttributes).Name == "test" &&

internal/activity/executor.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@ import (
1919
)
2020

2121
type Executor struct {
22-
logger log.Logger
23-
tracer trace.Tracer
24-
r *workflow.Registry
22+
logger log.Logger
23+
tracer trace.Tracer
24+
converter converter.Converter
25+
r *workflow.Registry
2526
}
2627

27-
func NewExecutor(logger log.Logger, tracer trace.Tracer, r *workflow.Registry) Executor {
28+
func NewExecutor(logger log.Logger, tracer trace.Tracer, converter converter.Converter, r *workflow.Registry) Executor {
2829
return Executor{
29-
logger: logger,
30-
tracer: tracer,
31-
r: r,
30+
logger: logger,
31+
tracer: tracer,
32+
converter: converter,
33+
r: r,
3234
}
3335
}
3436

@@ -45,7 +47,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
4547
return nil, errors.New("activity not a function")
4648
}
4749

48-
args, addContext, err := args.InputsToArgs(converter.DefaultConverter, activityFn, a.Inputs)
50+
args, addContext, err := args.InputsToArgs(e.converter, activityFn, a.Inputs)
4951
if err != nil {
5052
return nil, fmt.Errorf("converting activity inputs: %w", err)
5153
}
@@ -79,7 +81,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
7981

8082
if len(r) > 1 {
8183
var err error
82-
result, err = converter.DefaultConverter.To(r[0].Interface())
84+
result, err = e.converter.To(r[0].Interface())
8385
if err != nil {
8486
return nil, fmt.Errorf("converting activity result: %w", err)
8587
}

internal/converter/context.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package converter
2+
3+
import "github.com/cschleiden/go-workflows/internal/sync"
4+
5+
type converterKey struct{}
6+
7+
func WithConverter(ctx sync.Context, converter Converter) sync.Context {
8+
return sync.WithValue(ctx, converterKey{}, converter)
9+
}
10+
11+
func GetConverter(ctx sync.Context) Converter {
12+
return ctx.Value(converterKey{}).(Converter)
13+
}

0 commit comments

Comments
 (0)