Skip to content

Commit d9f51e8

Browse files
authored
Add Decoder API in client (#435)
* fix some typo * Add Decoder API in client * remove unused god encoding * use NewValue NewValues instead of new API * remove unused code * make fmt * add example to comments
1 parent a63991a commit d9f51e8

File tree

9 files changed

+87
-75
lines changed

9 files changed

+87
-75
lines changed

activity/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func GetMetricsScope(ctx context.Context) tally.Scope {
9393
// TODO: we don't have a way to distinguish between the two cases when context is cancelled because
9494
// context doesn't support overriding value of ctx.Error.
9595
// TODO: Implement automatic heartbeating with cancellation through ctx.
96-
// details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you
96+
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
9797
// can check error TimeOutType()/Details().
9898
func RecordHeartbeat(ctx context.Context, details ...interface{}) {
9999
internal.RecordActivityHeartbeat(ctx, details...)

client/client.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,3 +308,24 @@ var _ Client = internal.Client(nil)
308308
var _ internal.Client = Client(nil)
309309
var _ DomainClient = internal.DomainClient(nil)
310310
var _ internal.DomainClient = DomainClient(nil)
311+
312+
// NewValue creates a new encoded.Value which can be used to decode binary data returned by Cadence. For example:
313+
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
314+
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
315+
// which can be decoded by using:
316+
// var result string // This need to be same type as the one passed to RecordHeartbeat
317+
// NewValue(data).Get(&result)
318+
func NewValue(data []byte) encoded.Value {
319+
return internal.NewValue(data)
320+
}
321+
322+
// NewValues creates a new encoded.Values which can be used to decode binary data returned by Cadence. For example:
323+
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
324+
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
325+
// which can be decoded by using:
326+
// var result1 string
327+
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
328+
// NewValues(data).Get(&result1, &result2)
329+
func NewValues(data []byte) encoded.Values {
330+
return internal.NewValues(data)
331+
}

internal/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,3 +377,26 @@ func (p WorkflowIDReusePolicy) toThriftPtr() *s.WorkflowIdReusePolicy {
377377
}
378378
return &policy
379379
}
380+
381+
// NewValue creates a new encoded.Value which can be used to decode binary data returned by Cadence. For example:
382+
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
383+
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
384+
// which can be decoded by using:
385+
// var result string // This need to be same type as the one passed to RecordHeartbeat
386+
// NewValue(data).Get(&result)
387+
func NewValue(data []byte) encoded.Value {
388+
result := EncodedValue(data)
389+
return &result
390+
}
391+
392+
// NewValues creates a new encoded.Values which can be used to decode binary data returned by Cadence. For example:
393+
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
394+
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
395+
// which can be decoded by using:
396+
// var result1 string
397+
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
398+
// NewValues(data).Get(&result1, &result2)
399+
func NewValues(data []byte) encoded.Values {
400+
result := EncodedValues(data)
401+
return &result
402+
}

internal/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
// Context is a clone of context.Context with Done() returning Channel instead
3131
// of native channel.
32-
// A Context carries a deadline, a cancelation signal, and other values across
32+
// A Context carries a deadline, a cancellation signal, and other values across
3333
// API boundaries.
3434
//
3535
// Context's methods may be called by multiple goroutines simultaneously.
@@ -69,7 +69,7 @@ type Context interface {
6969
// }
7070
//
7171
// See http://blog.golang.org/pipelines for more examples of how to use
72-
// a Done channel for cancelation.
72+
// a Done channel for cancellation.
7373
Done() Channel
7474

7575
// Err returns a non-nil error value after Done is closed. Err returns

internal/internal_utils_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,41 @@ func TestChannelBuilderOptions(t *testing.T) {
3434

3535
require.Equal(t, time.Minute, builder.Timeout)
3636
}
37+
38+
type testDecodeStruct struct {
39+
Name string
40+
Age int
41+
}
42+
43+
func TestNewValues(t *testing.T) {
44+
var details []interface{}
45+
heartbeatDetail := "status-report-to-workflow"
46+
heartbeatDetail2 := 1
47+
heartbeatDetail3 := testDecodeStruct{
48+
Name: heartbeatDetail,
49+
Age: heartbeatDetail2,
50+
}
51+
details = append(details, heartbeatDetail, heartbeatDetail2, heartbeatDetail3)
52+
data, err := getHostEnvironment().encodeArgs(details)
53+
if err != nil {
54+
panic(err)
55+
}
56+
var res string
57+
var res2 int
58+
var res3 testDecodeStruct
59+
NewValues(data).Get(&res, &res2, &res3)
60+
require.Equal(t, heartbeatDetail, res)
61+
require.Equal(t, heartbeatDetail2, res2)
62+
require.Equal(t, heartbeatDetail3, res3)
63+
}
64+
65+
func TestNewValue(t *testing.T) {
66+
heartbeatDetail := "status-report-to-workflow"
67+
data, err := getHostEnvironment().encodeArg(heartbeatDetail)
68+
if err != nil {
69+
panic(err)
70+
}
71+
var res string
72+
NewValue(data).Get(&res)
73+
require.Equal(t, res, heartbeatDetail)
74+
}

internal/internal_worker.go

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ package internal
2525
import (
2626
"bytes"
2727
"context"
28-
"encoding/gob"
2928
"encoding/json"
3029
"errors"
3130
"fmt"
@@ -1140,43 +1139,6 @@ type encoding interface {
11401139
Unmarshal([]byte, []interface{}) error
11411140
}
11421141

1143-
// gobEncoding encapsulates gob encoding and decoding
1144-
type gobEncoding struct {
1145-
}
1146-
1147-
// Register implements the encoding interface
1148-
func (g gobEncoding) Register(obj interface{}) error {
1149-
gob.Register(obj)
1150-
return nil
1151-
}
1152-
1153-
// Marshal encodes an array of object into bytes
1154-
func (g gobEncoding) Marshal(objs []interface{}) ([]byte, error) {
1155-
var buf bytes.Buffer
1156-
enc := gob.NewEncoder(&buf)
1157-
for i, obj := range objs {
1158-
if err := enc.Encode(obj); err != nil {
1159-
return nil, fmt.Errorf(
1160-
"unable to encode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err)
1161-
}
1162-
}
1163-
return buf.Bytes(), nil
1164-
}
1165-
1166-
// Unmarshal decodes a byte array into the passed in objects
1167-
// TODO: To deal with different number of arguments, may be encode number of arguments as a first value as well.
1168-
// so we can decode if a ssubset of them are asked.
1169-
func (g gobEncoding) Unmarshal(data []byte, objs []interface{}) error {
1170-
dec := gob.NewDecoder(bytes.NewBuffer(data))
1171-
for i, obj := range objs {
1172-
if err := dec.Decode(obj); err != nil {
1173-
return fmt.Errorf(
1174-
"unable to decode argument: %d, %v, with gob error: %v", i, reflect.TypeOf(obj), err)
1175-
}
1176-
}
1177-
return nil
1178-
}
1179-
11801142
// jsonEncoding encapsulates json encoding and decoding
11811143
type jsonEncoding struct {
11821144
}

internal/internal_worker_test.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -841,14 +841,6 @@ type encodingTest struct {
841841
input []interface{}
842842
}
843843

844-
var encodingTests = []encodingTest{
845-
{&gobEncoding{}, []interface{}{"test"}},
846-
{&gobEncoding{}, []interface{}{"test1", "test2"}},
847-
{&gobEncoding{}, []interface{}{"test1", 1, false}},
848-
{&gobEncoding{}, []interface{}{"test1", testWorkflowResult{V: 10}, false}},
849-
{&gobEncoding{}, []interface{}{"test2", &testWorkflowResult{V: 20}, 4}},
850-
}
851-
852844
// duplicate of GetHostEnvironment().registerType()
853845
func testRegisterType(enc encoding, v interface{}) error {
854846
t := reflect.Indirect(reflect.ValueOf(v)).Type()
@@ -859,30 +851,6 @@ func testRegisterType(enc encoding, v interface{}) error {
859851
return enc.Register(arg)
860852
}
861853

862-
func TestGobEncoding(t *testing.T) {
863-
for _, et := range encodingTests {
864-
for _, obj := range et.input {
865-
err := testRegisterType(et.encoding, obj)
866-
require.NoError(t, err, err)
867-
}
868-
data, err := et.encoding.Marshal(et.input)
869-
require.NoError(t, err, err)
870-
871-
var result []interface{}
872-
for _, v := range et.input {
873-
arg := reflect.New(reflect.TypeOf(v)).Interface()
874-
result = append(result, arg)
875-
}
876-
err = et.encoding.Unmarshal(data, result)
877-
require.NoError(t, err, err)
878-
879-
for i := 0; i < len(et.input); i++ {
880-
vat := reflect.ValueOf(result[i]).Elem().Interface()
881-
require.Equal(t, et.input[i], vat)
882-
}
883-
}
884-
}
885-
886854
func Test_ActivityNilArgs(t *testing.T) {
887855
nilErr := errors.New("nils")
888856
activityFn := func(name string, idx int, strptr *string) error {

workflow/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
// Context is a clone of context.Context with Done() returning Channel instead
2828
// of native channel.
29-
// A Context carries a deadline, a cancelation signal, and other values across
29+
// A Context carries a deadline, a cancellation signal, and other values across
3030
// API boundaries.
3131
//
3232
// Context's methods may be called by multiple goroutines simultaneously.

workflow/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}
161161
// ExecuteChildWorkflow requests child workflow execution in the context of a workflow.
162162
// Context can be used to pass the settings for the child workflow.
163163
// For example: task list that this child workflow should be routed, timeouts that need to be configured.
164-
// Use ChildWorkflowOptions to pass down th`e options.
164+
// Use ChildWorkflowOptions to pass down the options.
165165
// cwo := ChildWorkflowOptions{
166166
// ExecutionStartToCloseTimeout: 10 * time.Minute,
167167
// TaskStartToCloseTimeout: time.Minute,

0 commit comments

Comments
 (0)