Skip to content

Commit e939630

Browse files
Fix local activity marker handler (#1002)
After introduction of short activity names, event handler fails to playback previously recorded markers with long names, as activity type no longer match. To fix it, we will only compare the last part of activity type.
1 parent f882a29 commit e939630

File tree

2 files changed

+10
-13
lines changed

2 files changed

+10
-13
lines changed

internal/internal_event_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa
10911091
}
10921092

10931093
if la, ok := weh.pendingLaTasks[lamd.ActivityID]; ok {
1094-
if len(lamd.ActivityType) > 0 && lamd.ActivityType != la.params.ActivityType {
1094+
if len(lamd.ActivityType) > 0 && lastPartOfName(lamd.ActivityType) != lastPartOfName(la.params.ActivityType) {
10951095
// history marker mismatch to the current code.
10961096
panicMsg := fmt.Sprintf("code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, string(markerData))
10971097
panicIllegalState(panicMsg)

internal/internal_worker_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ func (s *internalWorkerTestSuite) TearDownTest() {
132132
s.mockCtrl.Finish() // assert mock’s expectations
133133
}
134134

135-
func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityID string) []byte {
135+
func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityID, activityType string) []byte {
136136
lamd := localActivityMarkerData{
137137
ActivityID: activityID,
138+
ActivityType: activityType,
138139
ReplayTime: time.Now(),
139140
}
140141

@@ -271,7 +272,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
271272

272273
createTestEventLocalActivity(5, &shared.MarkerRecordedEventAttributes{
273274
MarkerName: common.StringPtr(localActivityMarkerName),
274-
Details: s.createLocalActivityMarkerDataForTest("0"),
275+
Details: s.createLocalActivityMarkerDataForTest("0", "go.uber.org/cadence/internal.testActivity"),
275276
DecisionTaskCompletedEventId: common.Int64Ptr(4),
276277
}),
277278

@@ -282,8 +283,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
282283

283284
history := &shared.History{Events: testEvents}
284285
logger := getLogger()
285-
replayer := NewWorkflowReplayer()
286-
replayer.RegisterWorkflow(testReplayWorkflowLocalActivity)
286+
replayer := &WorkflowReplayer{registry: s.registry}
287287
err := replayer.ReplayWorkflowHistory(logger, history)
288288
require.NoError(s.T(), err)
289289
}
@@ -302,7 +302,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result
302302

303303
createTestEventLocalActivity(5, &shared.MarkerRecordedEventAttributes{
304304
MarkerName: common.StringPtr(localActivityMarkerName),
305-
Details: s.createLocalActivityMarkerDataForTest("0"),
305+
Details: s.createLocalActivityMarkerDataForTest("0", ""),
306306
DecisionTaskCompletedEventId: common.Int64Ptr(4),
307307
}),
308308

@@ -314,8 +314,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result
314314

315315
history := &shared.History{Events: testEvents}
316316
logger := getLogger()
317-
replayer := NewWorkflowReplayer()
318-
replayer.RegisterWorkflow(testReplayWorkflow)
317+
replayer := &WorkflowReplayer{registry: s.registry}
319318
err := replayer.ReplayWorkflowHistory(logger, history)
320319
require.Error(s.T(), err)
321320
}
@@ -324,7 +323,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi
324323
taskList := "taskList1"
325324
testEvents := []*shared.HistoryEvent{
326325
createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{
327-
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflow")},
326+
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflowLocalActivity")},
328327
TaskList: &shared.TaskList{Name: common.StringPtr(taskList)},
329328
Input: testEncodeFunctionArgs(getDefaultDataConverter()),
330329
}),
@@ -334,20 +333,18 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi
334333

335334
createTestEventLocalActivity(5, &shared.MarkerRecordedEventAttributes{
336335
MarkerName: common.StringPtr(localActivityMarkerName),
337-
Details: s.createLocalActivityMarkerDataForTest("0"),
336+
Details: s.createLocalActivityMarkerDataForTest("0", "different-activity-type"),
338337
DecisionTaskCompletedEventId: common.Int64Ptr(4),
339338
}),
340339

341340
createTestEventWorkflowExecutionCompleted(6, &shared.WorkflowExecutionCompletedEventAttributes{
342-
Result: []byte("some-incorrect-result"),
343341
DecisionTaskCompletedEventId: common.Int64Ptr(4),
344342
}),
345343
}
346344

347345
history := &shared.History{Events: testEvents}
348346
logger := getLogger()
349-
replayer := NewWorkflowReplayer()
350-
replayer.RegisterWorkflow(testReplayWorkflow)
347+
replayer := &WorkflowReplayer{registry: s.registry}
351348
err := replayer.ReplayWorkflowHistory(logger, history)
352349
require.Error(s.T(), err)
353350
}

0 commit comments

Comments
 (0)