Skip to content

Commit 0b2d411

Browse files
committed
chore: support custom payload conveters
Signed-off-by: Valery Piashchynski <[email protected]>
1 parent 003a81f commit 0b2d411

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

internal.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,19 @@ func (p *Plugin) initPool() error {
3838
return err
3939
}
4040

41-
dc := dataconverter.NewDataConverter(converter.GetDefaultDataConverter())
42-
codec := proto.NewCodec(p.log, dc)
41+
dc := make([]converter.PayloadConverter, 0, 5)
42+
// standard payload converters
43+
dc = append(dc, converter.NewNilPayloadConverter())
44+
dc = append(dc, converter.NewByteSlicePayloadConverter())
45+
dc = append(dc, converter.NewProtoJSONPayloadConverter())
46+
dc = append(dc, converter.NewProtoPayloadConverter())
47+
48+
if p.temporal.customDataConverter != nil {
49+
dc = append(dc, p.temporal.customDataConverter)
50+
}
51+
52+
rrdc := dataconverter.NewDataConverter(converter.NewCompositeDataConverter(dc...))
53+
codec := proto.NewCodec(p.log, rrdc)
4354

4455
// LA + A definitions
4556
actDef := aggregatedpool.NewActivityDefinition(codec, ap, p.log, p.config.DisableActivityWorkers)
@@ -85,7 +96,7 @@ func (p *Plugin) initPool() error {
8596
return errors.Str("worker info should contain at least 1 worker")
8697
}
8798

88-
err = p.initTemporalClient(wi[0].PhpSdkVersion, wi[0].Flags, dc)
99+
err = p.initTemporalClient(wi[0].PhpSdkVersion, wi[0].Flags, rrdc)
89100
if err != nil {
90101
return err
91102
}

plugin.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/temporalio/roadrunner-temporal/v5/internal"
2121
"github.com/temporalio/roadrunner-temporal/v5/internal/codec/proto"
2222
tclient "go.temporal.io/sdk/client"
23+
"go.temporal.io/sdk/converter"
2324
"go.temporal.io/sdk/worker"
2425
"go.uber.org/zap"
2526

@@ -61,7 +62,8 @@ type temporal struct {
6162
client tclient.Client
6263
workers []worker.Worker
6364

64-
interceptors map[string]api.Interceptor
65+
interceptors map[string]api.Interceptor
66+
customDataConverter converter.PayloadConverter
6567
}
6668

6769
type Plugin struct {
@@ -285,7 +287,7 @@ func (p *Plugin) Workers() []*process.State {
285287
for i := range wfPw {
286288
st, err := process.WorkerProcessState(wfPw[i])
287289
if err != nil {
288-
// log error and continue
290+
// log the error and continue
289291
p.log.Error("worker process state error", zap.Error(err))
290292
continue
291293
}
@@ -296,7 +298,7 @@ func (p *Plugin) Workers() []*process.State {
296298
for i := range actPw {
297299
st, err := process.WorkerProcessState(actPw[i])
298300
if err != nil {
299-
// log error and continue
301+
// log the error and continue
300302
p.log.Error("worker process state error", zap.Error(err))
301303
continue
302304
}
@@ -410,6 +412,9 @@ func (p *Plugin) Collects() []*dep.In {
410412
p.temporal.interceptors[mdw.Name()] = mdw
411413
p.mu.Unlock()
412414
}, (*api.Interceptor)(nil)),
415+
dep.Fits(func(pp any) {
416+
p.temporal.customDataConverter = pp.(converter.PayloadConverter)
417+
}, (*converter.PayloadConverter)(nil)),
413418
}
414419
}
415420

0 commit comments

Comments
 (0)