Skip to content

Commit 73287b6

Browse files
Fix decode issue for local activities returnin nil result (#943) (#1011)
When local activity returns nil result, it is encoded into nil byte slice ([]byte). Later this is converted into string field ResultJSON within localActivityMarkerData structure. Such casting results in empty string, as this is its zero value in golang. Reverse conversion is not simetrical. Casting empty string back to byte slice results in empty byte slice. This later causes EOF error when trying to decode it back to the original result. The fix is to check whether it is non-empty string and only then assign the result back. This will be simetrical and will not cause decoding error later. This change also adds additional replay tests, that covers this scenario among many other local activity related cases. Co-authored-by: Bowei Xu <[email protected]>
1 parent ba95013 commit 73287b6

File tree

3 files changed

+323
-1
lines changed

3 files changed

+323
-1
lines changed

internal/internal_event_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa
11041104
lar.attempt = lamd.Attempt
11051105
lar.backoff = lamd.Backoff
11061106
lar.err = constructError(lamd.ErrReason, []byte(lamd.ErrJSON), weh.GetDataConverter())
1107-
} else {
1107+
} else if len(lamd.ResultJSON) > 0 {
11081108
lar.result = []byte(lamd.ResultJSON)
11091109
}
11101110
la.callback(lar)

internal/internal_worker_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,15 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi
351351
require.Error(s.T(), err)
352352
}
353353

354+
func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileLocalActivities() {
355+
logger := getLogger()
356+
replayer := NewWorkflowReplayer()
357+
wf := localActivitiesCallingOptionsWorkflow{s.T()}
358+
replayer.RegisterWorkflow(wf.Execute)
359+
err := replayer.ReplayWorkflowHistoryFromJSONFile(logger, "testdata/localActivities.json")
360+
require.NoError(s.T(), err)
361+
}
362+
354363
func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileParent() {
355364
logger := getLogger()
356365
replayer := NewWorkflowReplayer()
@@ -727,6 +736,85 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() {
727736
require.NotNil(s.T(), heartbeatRequest)
728737
}
729738

739+
type localActivitiesCallingOptionsWorkflow struct {
740+
t *testing.T
741+
}
742+
743+
func (w localActivitiesCallingOptionsWorkflow) Execute(ctx Context, input []byte) (result []byte, err error) {
744+
ao := LocalActivityOptions{
745+
ScheduleToCloseTimeout: time.Second,
746+
}
747+
ctx = WithLocalActivityOptions(ctx, ao)
748+
749+
// By functions.
750+
err = ExecuteLocalActivity(ctx, testActivityByteArgs, input).Get(ctx, nil)
751+
require.NoError(w.t, err, err)
752+
753+
err = ExecuteLocalActivity(ctx, testActivityMultipleArgs, 2, []string{"test"}, true).Get(ctx, nil)
754+
require.NoError(w.t, err, err)
755+
756+
err = ExecuteLocalActivity(ctx, testActivityNoResult, 2, "test").Get(ctx, nil)
757+
require.NoError(w.t, err, err)
758+
759+
err = ExecuteLocalActivity(ctx, testActivityNoContextArg, 2, "test").Get(ctx, nil)
760+
require.NoError(w.t, err, err)
761+
762+
f := ExecuteLocalActivity(ctx, testActivityReturnByteArray)
763+
var r []byte
764+
err = f.Get(ctx, &r)
765+
require.NoError(w.t, err, err)
766+
require.Equal(w.t, []byte("testActivity"), r)
767+
768+
f = ExecuteLocalActivity(ctx, testActivityReturnInt)
769+
var rInt int
770+
err = f.Get(ctx, &rInt)
771+
require.NoError(w.t, err, err)
772+
require.Equal(w.t, 5, rInt)
773+
774+
f = ExecuteLocalActivity(ctx, testActivityReturnString)
775+
var rString string
776+
err = f.Get(ctx, &rString)
777+
778+
require.NoError(w.t, err, err)
779+
require.Equal(w.t, "testActivity", rString)
780+
781+
f = ExecuteLocalActivity(ctx, testActivityReturnEmptyString)
782+
var r2String string
783+
err = f.Get(ctx, &r2String)
784+
require.NoError(w.t, err, err)
785+
require.Equal(w.t, "", r2String)
786+
787+
f = ExecuteLocalActivity(ctx, testActivityReturnEmptyStruct)
788+
var r2Struct testActivityResult
789+
err = f.Get(ctx, &r2Struct)
790+
require.NoError(w.t, err, err)
791+
require.Equal(w.t, testActivityResult{}, r2Struct)
792+
793+
f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtr)
794+
var rStructPtr *testActivityResult
795+
err = f.Get(ctx, &rStructPtr)
796+
require.NoError(w.t, err, err)
797+
require.True(w.t, rStructPtr == nil)
798+
799+
f = ExecuteLocalActivity(ctx, testActivityReturnStructPtr)
800+
err = f.Get(ctx, &rStructPtr)
801+
require.NoError(w.t, err, err)
802+
require.Equal(w.t, *rStructPtr, testActivityResult{Index: 10})
803+
804+
f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtrPtr)
805+
var rStruct2Ptr **testActivityResult
806+
err = f.Get(ctx, &rStruct2Ptr)
807+
require.NoError(w.t, err, err)
808+
require.True(w.t, rStruct2Ptr == nil)
809+
810+
f = ExecuteLocalActivity(ctx, testActivityReturnStructPtrPtr)
811+
err = f.Get(ctx, &rStruct2Ptr)
812+
require.NoError(w.t, err, err)
813+
require.True(w.t, **rStruct2Ptr == testActivityResult{Index: 10})
814+
815+
return []byte("Done"), nil
816+
}
817+
730818
type activitiesCallingOptionsWorkflow struct {
731819
t *testing.T
732820
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
[
2+
{
3+
"eventId": 1,
4+
"timestamp": 1595342431739033300,
5+
"eventType": "WorkflowExecutionStarted",
6+
"version": -24,
7+
"taskId": 1049501,
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "go.uber.org/cadence/internal.localActivitiesCallingOptionsWorkflow.Execute"
11+
},
12+
"taskList": {
13+
"name": "tl-1"
14+
},
15+
"executionStartToCloseTimeoutSeconds": 15,
16+
"taskStartToCloseTimeoutSeconds": 1,
17+
"originalExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc",
18+
"identity": "57330@vytautas-C02X55E1JGH6@",
19+
"firstExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc",
20+
"attempt": 0,
21+
"cronSchedule": "",
22+
"firstDecisionTaskBackoffSeconds": 0,
23+
"header": {
24+
"fields": {}
25+
}
26+
}
27+
},
28+
{
29+
"eventId": 2,
30+
"timestamp": 1595342431739132600,
31+
"eventType": "DecisionTaskScheduled",
32+
"version": -24,
33+
"taskId": 1049502,
34+
"decisionTaskScheduledEventAttributes": {
35+
"taskList": {
36+
"name": "tl-1"
37+
},
38+
"startToCloseTimeoutSeconds": 1,
39+
"attempt": 0
40+
}
41+
},
42+
{
43+
"eventId": 3,
44+
"timestamp": 1595342431763649100,
45+
"eventType": "DecisionTaskStarted",
46+
"version": -24,
47+
"taskId": 1049510,
48+
"decisionTaskStartedEventAttributes": {
49+
"scheduledEventId": 2,
50+
"identity": "57330@vytautas-C02X55E1JGH6@tl-1",
51+
"requestId": "bf3ec0d2-f940-460b-82e2-7da51fc73c79"
52+
}
53+
},
54+
{
55+
"eventId": 4,
56+
"timestamp": 1595342431780575200,
57+
"eventType": "DecisionTaskCompleted",
58+
"version": -24,
59+
"taskId": 1049513,
60+
"decisionTaskCompletedEventAttributes": {
61+
"scheduledEventId": 2,
62+
"startedEventId": 3,
63+
"identity": "57330@vytautas-C02X55E1JGH6@tl-1",
64+
"binaryChecksum": "dc561051f5fc11e373587ce8e2a8dd93"
65+
}
66+
},
67+
{
68+
"eventId": 5,
69+
"timestamp": 1595342431780709600,
70+
"eventType": "MarkerRecorded",
71+
"version": -24,
72+
"taskId": 1049514,
73+
"markerRecordedEventAttributes": {
74+
"markerName": "LocalActivity",
75+
"details": "eyJhY3Rpdml0eUlkIjoiMCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlCeXRlQXJncyIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzg1MDM2NSswMzowMCJ9Cg==",
76+
"decisionTaskCompletedEventId": 4
77+
}
78+
},
79+
{
80+
"eventId": 6,
81+
"timestamp": 1595342431780729200,
82+
"eventType": "MarkerRecorded",
83+
"version": -24,
84+
"taskId": 1049515,
85+
"markerRecordedEventAttributes": {
86+
"markerName": "LocalActivity",
87+
"details": "eyJhY3Rpdml0eUlkIjoiMSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlNdWx0aXBsZUFyZ3MiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjM5Mjc4OTErMDM6MDAifQo=",
88+
"decisionTaskCompletedEventId": 4
89+
}
90+
},
91+
{
92+
"eventId": 7,
93+
"timestamp": 1595342431780748400,
94+
"eventType": "MarkerRecorded",
95+
"version": -24,
96+
"taskId": 1049516,
97+
"markerRecordedEventAttributes": {
98+
"markerName": "LocalActivity",
99+
"details": "eyJhY3Rpdml0eUlkIjoiMiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb1Jlc3VsdCIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzk5NjQ2MyswMzowMCJ9Cg==",
100+
"decisionTaskCompletedEventId": 4
101+
}
102+
},
103+
{
104+
"eventId": 8,
105+
"timestamp": 1595342431780770400,
106+
"eventType": "MarkerRecorded",
107+
"version": -24,
108+
"taskId": 1049517,
109+
"markerRecordedEventAttributes": {
110+
"markerName": "LocalActivity",
111+
"details": "eyJhY3Rpdml0eUlkIjoiMyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb0NvbnRleHRBcmciLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQwNzk4NDYrMDM6MDAifQo=",
112+
"decisionTaskCompletedEventId": 4
113+
}
114+
},
115+
{
116+
"eventId": 9,
117+
"timestamp": 1595342431780787500,
118+
"eventType": "MarkerRecorded",
119+
"version": -24,
120+
"taskId": 1049518,
121+
"markerRecordedEventAttributes": {
122+
"markerName": "LocalActivity",
123+
"details": "eyJhY3Rpdml0eUlkIjoiNCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5CeXRlQXJyYXkiLCJyZXN1bHRKc29uIjoidGVzdEFjdGl2aXR5IiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MTM0NzEyKzAzOjAwIn0K",
124+
"decisionTaskCompletedEventId": 4
125+
}
126+
},
127+
{
128+
"eventId": 10,
129+
"timestamp": 1595342431780807500,
130+
"eventType": "MarkerRecorded",
131+
"version": -24,
132+
"taskId": 1049519,
133+
"markerRecordedEventAttributes": {
134+
"markerName": "LocalActivity",
135+
"details": "eyJhY3Rpdml0eUlkIjoiNSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5JbnQiLCJyZXN1bHRKc29uIjoiNVxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjAyODU3KzAzOjAwIn0K",
136+
"decisionTaskCompletedEventId": 4
137+
}
138+
},
139+
{
140+
"eventId": 11,
141+
"timestamp": 1595342431780827000,
142+
"eventType": "MarkerRecorded",
143+
"version": -24,
144+
"taskId": 1049520,
145+
"markerRecordedEventAttributes": {
146+
"markerName": "LocalActivity",
147+
"details": "eyJhY3Rpdml0eUlkIjoiNiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5TdHJpbmciLCJyZXN1bHRKc29uIjoiXCJ0ZXN0QWN0aXZpdHlcIlxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjUxMTI2KzAzOjAwIn0K",
148+
"decisionTaskCompletedEventId": 4
149+
}
150+
},
151+
{
152+
"eventId": 12,
153+
"timestamp": 1595342431780845100,
154+
"eventType": "MarkerRecorded",
155+
"version": -24,
156+
"taskId": 1049521,
157+
"markerRecordedEventAttributes": {
158+
"markerName": "LocalActivity",
159+
"details": "eyJhY3Rpdml0eUlkIjoiNyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cmluZyIsInJlc3VsdEpzb24iOiJcIlwiXG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQyOTc4ODcrMDM6MDAifQo=",
160+
"decisionTaskCompletedEventId": 4
161+
}
162+
},
163+
{
164+
"eventId": 13,
165+
"timestamp": 1595342431780861100,
166+
"eventType": "MarkerRecorded",
167+
"version": -24,
168+
"taskId": 1049522,
169+
"markerRecordedEventAttributes": {
170+
"markerName": "LocalActivity",
171+
"details": "eyJhY3Rpdml0eUlkIjoiOCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cnVjdCIsInJlc3VsdEpzb24iOiJ7XCJJbmRleFwiOjB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQzNzE5MzIrMDM6MDAifQo=",
172+
"decisionTaskCompletedEventId": 4
173+
}
174+
},
175+
{
176+
"eventId": 14,
177+
"timestamp": 1595342431780881000,
178+
"eventType": "MarkerRecorded",
179+
"version": -24,
180+
"taskId": 1049523,
181+
"markerRecordedEventAttributes": {
182+
"markerName": "LocalActivity",
183+
"details": "eyJhY3Rpdml0eUlkIjoiOSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5OaWxTdHJ1Y3RQdHIiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0NDIzNDMrMDM6MDAifQo=",
184+
"decisionTaskCompletedEventId": 4
185+
}
186+
},
187+
{
188+
"eventId": 15,
189+
"timestamp": 1595342431780901700,
190+
"eventType": "MarkerRecorded",
191+
"version": -24,
192+
"taskId": 1049524,
193+
"markerRecordedEventAttributes": {
194+
"markerName": "LocalActivity",
195+
"details": "eyJhY3Rpdml0eUlkIjoiMTAiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0ODYzOSswMzowMCJ9Cg==",
196+
"decisionTaskCompletedEventId": 4
197+
}
198+
},
199+
{
200+
"eventId": 16,
201+
"timestamp": 1595342431780918200,
202+
"eventType": "MarkerRecorded",
203+
"version": -24,
204+
"taskId": 1049525,
205+
"markerRecordedEventAttributes": {
206+
"markerName": "LocalActivity",
207+
"details": "eyJhY3Rpdml0eUlkIjoiMTEiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuTmlsU3RydWN0UHRyUHRyIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0NTM2MTIzKzAzOjAwIn0K",
208+
"decisionTaskCompletedEventId": 4
209+
}
210+
},
211+
{
212+
"eventId": 17,
213+
"timestamp": 1595342431780930300,
214+
"eventType": "MarkerRecorded",
215+
"version": -24,
216+
"taskId": 1049526,
217+
"markerRecordedEventAttributes": {
218+
"markerName": "LocalActivity",
219+
"details": "eyJhY3Rpdml0eUlkIjoiMTIiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyUHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ2MzY3NjgrMDM6MDAifQo=",
220+
"decisionTaskCompletedEventId": 4
221+
}
222+
},
223+
{
224+
"eventId": 18,
225+
"timestamp": 1595342431780949100,
226+
"eventType": "WorkflowExecutionCompleted",
227+
"version": -24,
228+
"taskId": 1049527,
229+
"workflowExecutionCompletedEventAttributes": {
230+
"result": "RG9uZQ==",
231+
"decisionTaskCompletedEventId": 4
232+
}
233+
}
234+
]

0 commit comments

Comments
 (0)