Skip to content

Commit 7636327

Browse files
authored
Add Data Converter to support custom serialization/deserialization (#463)
* Add data converter to support custom encode/decode * Add comments on DataConverter interface * Address comments * Address comments
1 parent bbc380f commit 7636327

28 files changed

+1229
-588
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ test.log
1616
/dummy
1717
.build
1818
.bins
19+
.DS_Store

encoded/encoded.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,22 @@ type (
3838
// Get extract the encoded values into strong typed value pointers.
3939
Get(valuePtr ...interface{}) error
4040
}
41+
42+
// DataConverter is used by the framework to serialize/deserialize input and output of activity/workflow
43+
// that need to be sent over the wire.
44+
// To encode/decode workflow arguments, one should set DataConverter in two places:
45+
// 1. Workflow worker, through worker.Options
46+
// 2. Client, through client.Options
47+
// To encode/decode Activity/ChildWorkflow arguments, one should set DataConverter in two places:
48+
// 1. Inside workflow code, use workflow.WithDataConverter to create new Context,
49+
// and pass that context to ExecuteActivity/ExecuteChildWorkflow calls.
50+
// Cadence support using different DataConverters for different activity/childWorkflow in same workflow.
51+
// 2. Activity/Workflow worker that run these activity/childWorkflow, through worker.Options.
52+
DataConverter interface {
53+
// ToData implements conversion of a list of values.
54+
ToData(value ...interface{}) ([]byte, error)
55+
// FromData implements conversion of an array of values of different types.
56+
// Useful for deserializing arguments of function invocations.
57+
FromData(input []byte, valuePtr ...interface{}) error
58+
}
4159
)

internal/activity.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/uber-go/tally"
2828
"go.uber.org/cadence/.gen/go/shared"
29+
"go.uber.org/cadence/encoded"
2930
"go.uber.org/cadence/internal/common"
3031
"go.uber.org/zap"
3132
)
@@ -139,7 +140,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
139140
var err error
140141
// We would like to be a able to pass in "nil" as part of details(that is no progress to report to)
141142
if len(details) != 1 || details[0] != nil {
142-
data, err = getHostEnvironment().encodeArgs(details)
143+
data, err = encodeArgs(getDataConverterFromActivityCtx(ctx), details)
143144
if err != nil {
144145
panic(err)
145146
}
@@ -168,6 +169,7 @@ func WithActivityTask(
168169
invoker ServiceInvoker,
169170
logger *zap.Logger,
170171
scope tally.Scope,
172+
dataConverter encoded.DataConverter,
171173
) context.Context {
172174
var deadline time.Time
173175
scheduled := time.Unix(0, task.GetScheduledTimestamp())
@@ -198,6 +200,7 @@ func WithActivityTask(
198200
scheduledTimestamp: scheduled,
199201
startedTimestamp: started,
200202
taskList: taskList,
203+
dataConverter: dataConverter,
201204
})
202205
}
203206

internal/activity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
162162
service3.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), callOptions...).
163163
Return(&shared.RecordActivityTaskHeartbeatResponse{}, nil).
164164
Do(func(ctx context.Context, request *shared.RecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) {
165-
ev := EncodedValues(request.Details)
165+
ev := newEncodedValues(request.Details, nil)
166166
var progress string
167167
err := ev.Get(&progress)
168168
if err != nil {

internal/client.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,9 @@ type (
241241

242242
// ClientOptions are optional parameters for Client creation.
243243
ClientOptions struct {
244-
MetricsScope tally.Scope
245-
Identity string
244+
MetricsScope tally.Scope
245+
Identity string
246+
DataConverter encoded.DataConverter
246247
}
247248

248249
// StartWorkflowOptions configuration parameters for starting a workflow execution.
@@ -334,11 +335,18 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
334335
metricScope = options.MetricsScope
335336
}
336337
metricScope = tagScope(metricScope, tagDomain, domain)
338+
var dataConverter encoded.DataConverter
339+
if options != nil && options.DataConverter != nil {
340+
dataConverter = options.DataConverter
341+
} else {
342+
dataConverter = newDefaultDataConverter()
343+
}
337344
return &workflowClient{
338345
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
339346
domain: domain,
340347
metricsScope: metrics.NewTaggedScope(metricScope),
341348
identity: identity,
349+
dataConverter: dataConverter,
342350
}
343351
}
344352

@@ -384,8 +392,7 @@ func (p WorkflowIDReusePolicy) toThriftPtr() *s.WorkflowIdReusePolicy {
384392
// var result string // This need to be same type as the one passed to RecordHeartbeat
385393
// NewValue(data).Get(&result)
386394
func NewValue(data []byte) encoded.Value {
387-
result := EncodedValue(data)
388-
return &result
395+
return newEncodedValue(data, nil)
389396
}
390397

391398
// NewValues creates a new encoded.Values which can be used to decode binary data returned by Cadence. For example:
@@ -396,6 +403,5 @@ func NewValue(data []byte) encoded.Value {
396403
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
397404
// NewValues(data).Get(&result1, &result2)
398405
func NewValues(data []byte) encoded.Values {
399-
result := EncodedValues(data)
400-
return &result
406+
return newEncodedValues(data, nil)
401407
}

internal/error.go

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"strings"
2727

2828
"go.uber.org/cadence/.gen/go/shared"
29+
"go.uber.org/cadence/encoded"
2930
)
3031

3132
/*
@@ -87,7 +88,7 @@ type (
8788
// CustomError returned from workflow and activity implementations with reason and optional details.
8889
CustomError struct {
8990
reason string
90-
details EncodedValues
91+
details encoded.Values
9192
}
9293

9394
// GenericError returned from workflow/workflow when the implementations return errors other than from NewCustomError() API.
@@ -98,12 +99,12 @@ type (
9899
// TimeoutError returned when activity or child workflow timed out.
99100
TimeoutError struct {
100101
timeoutType shared.TimeoutType
101-
details EncodedValues
102+
details encoded.Values
102103
}
103104

104105
// CanceledError returned when operation was canceled.
105106
CanceledError struct {
106-
details EncodedValues
107+
details encoded.Values
107108
}
108109

109110
// TerminatedError returned when workflow was terminated.
@@ -134,6 +135,9 @@ const (
134135
// ErrNoData is returned when trying to extract strong typed data while there is no data available.
135136
var ErrNoData = errors.New("no data available")
136137

138+
// ErrTooManyArg is returned when trying to extract strong typed data with more arguments than available data.
139+
var ErrTooManyArg = errors.New("too many arguments")
140+
137141
// ErrActivityResultPending is returned from activity's implementation to indicate the activity is not completed when
138142
// activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an
139143
// activity require human interaction (like approve an expense report), the activity could return activity.ErrResultPending
@@ -146,12 +150,14 @@ func NewCustomError(reason string, details ...interface{}) *CustomError {
146150
if strings.HasPrefix(reason, "cadenceInternal:") {
147151
panic("'cadenceInternal:' is reserved prefix, please use different reason")
148152
}
149-
150-
data, err := getHostEnvironment().encodeArgs(details)
151-
if err != nil {
152-
panic(err)
153+
// When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get
154+
if len(details) == 1 {
155+
if d, ok := details[0].(*EncodedValues); ok {
156+
return &CustomError{reason: reason, details: d}
157+
}
153158
}
154-
return &CustomError{reason: reason, details: data}
159+
// When create error for server, use ErrorDetailsValues as details to hold values and encode later
160+
return &CustomError{reason: reason, details: ErrorDetailsValues(details)}
155161
}
156162

157163
// NewTimeoutError creates TimeoutError instance.
@@ -162,20 +168,22 @@ func NewTimeoutError(timeoutType shared.TimeoutType) *TimeoutError {
162168

163169
// NewHeartbeatTimeoutError creates TimeoutError instance
164170
func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError {
165-
data, err := getHostEnvironment().encodeArgs(details)
166-
if err != nil {
167-
panic(err)
171+
if len(details) == 1 {
172+
if d, ok := details[0].(*EncodedValues); ok {
173+
return &TimeoutError{timeoutType: shared.TimeoutTypeHeartbeat, details: d}
174+
}
168175
}
169-
return &TimeoutError{timeoutType: shared.TimeoutTypeHeartbeat, details: data}
176+
return &TimeoutError{timeoutType: shared.TimeoutTypeHeartbeat, details: ErrorDetailsValues(details)}
170177
}
171178

172179
// NewCanceledError creates CanceledError instance
173180
func NewCanceledError(details ...interface{}) *CanceledError {
174-
data, err := getHostEnvironment().encodeArgs(details)
175-
if err != nil {
176-
panic(err)
181+
if len(details) == 1 {
182+
if d, ok := details[0].(*EncodedValues); ok {
183+
return &CanceledError{details: d}
184+
}
177185
}
178-
return &CanceledError{details: data}
186+
return &CanceledError{details: ErrorDetailsValues(details)}
179187
}
180188

181189
// NewContinueAsNewError creates ContinueAsNewError instance
@@ -192,14 +200,14 @@ func NewCanceledError(details ...interface{}) *CanceledError {
192200
//
193201
func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError {
194202
// Validate type and its arguments.
195-
workflowType, input, err := getValidatedWorkflowFunction(wfn, args)
196-
if err != nil {
197-
panic(err)
198-
}
199203
options := getWorkflowEnvOptions(ctx)
200204
if options == nil {
201205
panic("context is missing required options for continue as new")
202206
}
207+
workflowType, input, err := getValidatedWorkflowFunction(wfn, args, options.dataConverter)
208+
if err != nil {
209+
panic(err)
210+
}
203211
if options.taskListName == nil || *options.taskListName == "" {
204212
panic("invalid task list provided")
205213
}
@@ -230,11 +238,14 @@ func (e *CustomError) Reason() string {
230238

231239
// HasDetails return if this error has strong typed detail data.
232240
func (e *CustomError) HasDetails() bool {
233-
return e.details.HasValues()
241+
return e.details != nil && e.details.HasValues()
234242
}
235243

236244
// Details extracts strong typed detail data of this custom error. If there is no details, it will return ErrNoData.
237245
func (e *CustomError) Details(d ...interface{}) error {
246+
if !e.HasDetails() {
247+
return ErrNoData
248+
}
238249
return e.details.Get(d...)
239250
}
240251

@@ -255,11 +266,14 @@ func (e *TimeoutError) TimeoutType() shared.TimeoutType {
255266

256267
// HasDetails return if this error has strong typed detail data.
257268
func (e *TimeoutError) HasDetails() bool {
258-
return e.details.HasValues()
269+
return e.details != nil && e.details.HasValues()
259270
}
260271

261272
// Details extracts strong typed detail data of this error. If there is no details, it will return ErrNoData.
262273
func (e *TimeoutError) Details(d ...interface{}) error {
274+
if !e.HasDetails() {
275+
return ErrNoData
276+
}
263277
return e.details.Get(d...)
264278
}
265279

@@ -270,11 +284,14 @@ func (e *CanceledError) Error() string {
270284

271285
// HasDetails return if this error has strong typed detail data.
272286
func (e *CanceledError) HasDetails() bool {
273-
return e.details.HasValues()
287+
return e.details != nil && e.details.HasValues()
274288
}
275289

276290
// Details extracts strong typed detail data of this error.
277291
func (e *CanceledError) Details(d ...interface{}) error {
292+
if !e.HasDetails() {
293+
return ErrNoData
294+
}
278295
return e.details.Get(d...)
279296
}
280297

0 commit comments

Comments
 (0)