Skip to content

Commit eb10667

Browse files
samarabbasvancexu
authored andcommitted
Fix invalid state transition on attempt to cancel (#817)
* Fix invalid state transition on attempt to cancel Client channel implementation of close call registered cancel handlers by iterating over all blocked receivers. In the case of Cancel this results in blocked receiver collection to be modified during iteration which causes some handlers to be called multiple times. This fix clones the blocked receivers and uses the cloned copy for iteration. This guarantees that all callbacks are called once irrespective of any modifications to the list of blocked receivers during the iteration. Also added new API to allow partial replay of history from json file upto any specificed event ID. * move ActivityCancelRepro test workflow to the end
1 parent 1e3023d commit eb10667

File tree

6 files changed

+117
-4
lines changed

6 files changed

+117
-4
lines changed

internal/internal_workflow.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,9 @@ func (c *channelImpl) sendAsyncImpl(v interface{}, pair *sendCallback) (ok bool)
715715

716716
func (c *channelImpl) Close() {
717717
c.closed = true
718-
for _, callback := range c.blockedReceives {
718+
// Use a copy of blockedReceives for iteration as invoking callback could result in modification
719+
copy := append(c.blockedReceives[:0:0], c.blockedReceives...)
720+
for _, callback := range copy {
719721
callback.fn(nil, false)
720722
}
721723
// All blocked sends are going to panic

internal/worker.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,16 @@ func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error {
300300
// The logger is an optional parameter. Defaults to the noop logger.
301301
func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error {
302302

303-
history, err := extractHistoryFromFile(jsonfileName)
303+
return ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, 0)
304+
}
305+
306+
// ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file upto provided
307+
// lastEventID(inclusive).
308+
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
309+
// The logger is an optional parameter. Defaults to the noop logger.
310+
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error {
311+
312+
history, err := extractHistoryFromFile(jsonfileName, lastEventID)
304313

305314
if err != nil {
306315
return err
@@ -409,7 +418,7 @@ func replayWorkflowHistory(logger *zap.Logger, service workflowserviceclient.Int
409418
return err
410419
}
411420

412-
func extractHistoryFromFile(jsonfileName string) (*shared.History, error) {
421+
func extractHistoryFromFile(jsonfileName string, lastEventID int64) (*shared.History, error) {
413422
raw, err := ioutil.ReadFile(jsonfileName)
414423
if err != nil {
415424
return nil, err
@@ -421,7 +430,21 @@ func extractHistoryFromFile(jsonfileName string) (*shared.History, error) {
421430
if err != nil {
422431
return nil, err
423432
}
424-
history := &shared.History{Events: deserializedEvents}
433+
434+
if lastEventID <= 0 {
435+
return &shared.History{Events: deserializedEvents}, nil
436+
}
437+
438+
// Caller is potentially asking for subset of history instead of all history events
439+
events := []*shared.HistoryEvent{}
440+
for _, event := range deserializedEvents {
441+
events = append(events, event)
442+
if event.GetEventId() == lastEventID {
443+
// Copy history upto last event (inclusive)
444+
break
445+
}
446+
}
447+
history := &shared.History{Events: events}
425448

426449
return history, nil
427450
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[{"eventId":1,"timestamp":1563844217060613000,"eventType":"WorkflowExecutionStarted","version":-24,"workflowExecutionStartedEventAttributes":{"workflowType":{"name":"go.uber.org/cadence/test.(*Workflows).ActivityCancelRepro-fm"},"taskList":{"name":"tl-1"},"executionStartToCloseTimeoutSeconds":10,"taskStartToCloseTimeoutSeconds":1,"identity":"97228@samar-C02XG22GJGH6@"}},{"eventId":2,"timestamp":1563844217060620000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":3,"timestamp":1563844217066914000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":2,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"9c612c81-6cd9-402d-866f-e5652e9c4823"}},{"eventId":4,"timestamp":1563844217073526000,"eventType":"DecisionTaskCompleted","version":-24,"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":5,"timestamp":1563844217073598000,"eventType":"TimerStarted","version":-24,"timerStartedEventAttributes":{"timerId":"0","startToFireTimeoutSeconds":10,"decisionTaskCompletedEventId":4}},{"eventId":6,"timestamp":1563844217073620000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"1","activityType":{"name":"toUpperWithDelay"},"taskList":{"name":"tl-1"},"input":"ImhlbGxvIgo1MDAwMDAwMDAwCg==","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":7,"timestamp":1563844217073670000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"2","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":8,"timestamp":1563844217073679000,"eventType":"ActivityTaskScheduled","version":-24,"activityTaskScheduledEventAttributes":{"activityId":"3","activityType":{"name":"toUpper"},"taskList":{"name":"bad_tl"},"input":"ImhlbGxvIgo=","scheduleToCloseTimeoutSeconds":10,"scheduleToStartTimeoutSeconds":10,"startToCloseTimeoutSeconds":9,"heartbeatTimeoutSeconds":0,"decisionTaskCompletedEventId":4}},{"eventId":9,"timestamp":1563844217080804000,"eventType":"ActivityTaskStarted","version":-24,"activityTaskStartedEventAttributes":{"scheduledEventId":6,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"8b1ab5fd-5f15-4867-af33-97a7b00da341","attempt":0}},{"eventId":10,"timestamp":1563844222089088000,"eventType":"ActivityTaskCompleted","version":-24,"activityTaskCompletedEventAttributes":{"result":"IkhFTExPIgo=","scheduledEventId":6,"startedEventId":9,"identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":11,"timestamp":1563844222089104000,"eventType":"DecisionTaskScheduled","version":-24,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"tl-1"},"startToCloseTimeoutSeconds":1,"attempt":0}},{"eventId":12,"timestamp":1563844222096052000,"eventType":"DecisionTaskStarted","version":-24,"decisionTaskStartedEventAttributes":{"scheduledEventId":11,"identity":"97228@samar-C02XG22GJGH6@tl-1","requestId":"89f09b7a-2f34-497f-b3c4-99ede5efaf30"}},{"eventId":13,"timestamp":1563844222102892000,"eventType":"DecisionTaskFailed","version":-24,"decisionTaskFailedEventAttributes":{"scheduledEventId":11,"startedEventId":12,"cause":"WORKFLOW_WORKER_UNHANDLED_FAILURE","details":"aW52YWxpZCBzdGF0ZSB0cmFuc2l0aW9uOiBhdHRlbXB0IHRvIGNhbmNlbCwgRGVjaXNpb25UeXBlOiBBY3Rpdml0eSwgSUQ6IDMsIHN0YXRlPUNhbmNlbGVkQWZ0ZXJJbml0aWF0ZWQsIGlzRG9uZSgpPWZhbHNlLCBoaXN0b3J5PVtDcmVhdGVkIGhhbmRsZURlY2lzaW9uU2VudCBEZWNpc2lvblNlbnQgaGFuZGxlSW5pdGlhdGVkRXZlbnQgSW5pdGlhdGVkIGNhbmNlbCBDYW5jZWxlZEFmdGVySW5pdGlhdGVkXQ==","identity":"97228@samar-C02XG22GJGH6@tl-1"}},{"eventId":14,"timestamp":1563844227061245000,"eventType":"WorkflowExecutionTimedOut","version":-24,"workflowExecutionTimedOutEventAttributes":{"timeoutType":"START_TO_CLOSE"}}]

test/integration_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,19 @@ func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() {
274274
ts.Equal("memoVal, searchAttrVal", result)
275275
}
276276

277+
func (ts *IntegrationTestSuite) TestActivityCancelUsingReplay() {
278+
logger, err := zap.NewDevelopment()
279+
err = worker.ReplayPartialWorkflowHistoryFromJSONFile(logger, "fixtures/activity.cancel.sm.repro.json", 12)
280+
ts.Nil(err)
281+
}
282+
283+
func (ts *IntegrationTestSuite) TestActivityCancelRepro() {
284+
var expected []string
285+
err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected)
286+
ts.Nil(err)
287+
ts.EqualValues(expected, ts.activities.invoked())
288+
}
289+
277290
func (ts *IntegrationTestSuite) registerDomain() {
278291
client := client.NewDomainClient(ts.rpcClient.Interface, &client.Options{})
279292
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)

test/workflow_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,69 @@ func (w *Workflows) ChildWorkflowSuccess(ctx workflow.Context) (result string, e
293293
return
294294
}
295295

296+
func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) {
297+
ctx, cancelFunc := workflow.WithCancel(ctx)
298+
299+
// First go-routine which triggers cancellation on completion of first activity
300+
workflow.Go(ctx, func(ctx1 workflow.Context) {
301+
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
302+
ScheduleToStartTimeout: 10 * time.Second,
303+
ScheduleToCloseTimeout: 10 * time.Second,
304+
StartToCloseTimeout: 9 * time.Second,
305+
})
306+
307+
activityF := workflow.ExecuteActivity(activityCtx, "toUpperWithDelay", "hello", 1 * time.Second)
308+
var ans string
309+
err := activityF.Get(activityCtx, &ans)
310+
if err != nil {
311+
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
312+
return
313+
}
314+
315+
// Trigger cancellation of root context
316+
cancelFunc()
317+
})
318+
319+
// Second go-routine which get blocked on ActivitySchedule and not started
320+
workflow.Go(ctx, func(ctx1 workflow.Context) {
321+
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
322+
ScheduleToStartTimeout: 10 * time.Second,
323+
ScheduleToCloseTimeout: 10 * time.Second,
324+
StartToCloseTimeout: 1 * time.Second,
325+
TaskList: "bad_tl",
326+
})
327+
328+
activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello")
329+
var ans string
330+
err := activityF.Get(activityCtx, &ans)
331+
if err != nil {
332+
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
333+
}
334+
})
335+
336+
// Third go-routine which get blocked on ActivitySchedule and not started
337+
workflow.Go(ctx, func(ctx1 workflow.Context) {
338+
activityCtx := workflow.WithActivityOptions(ctx1, workflow.ActivityOptions{
339+
ScheduleToStartTimeout: 10 * time.Second,
340+
ScheduleToCloseTimeout: 10 * time.Second,
341+
StartToCloseTimeout: 1 * time.Second,
342+
TaskList: "bad_tl",
343+
})
344+
345+
activityF := workflow.ExecuteActivity(activityCtx, "toUpper", "hello")
346+
var ans string
347+
err := activityF.Get(activityCtx, &ans)
348+
if err != nil {
349+
workflow.GetLogger(activityCtx).Sugar().Infof("Activity Failed: Err: %v", err)
350+
}
351+
})
352+
353+
// Cause the workflow to block on sleep
354+
workflow.Sleep(ctx, 10 * time.Second)
355+
356+
return []string{"toUpperWithDelay"}, nil
357+
}
358+
296359
func (w *Workflows) child(ctx workflow.Context, arg string, mustFail bool) (string, error) {
297360
var result string
298361
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
@@ -339,6 +402,7 @@ func (w *Workflows) register() {
339402
workflow.Register(w.sleep)
340403
workflow.Register(w.child)
341404
workflow.Register(w.childForMemoAndSearchAttr)
405+
workflow.Register(w.ActivityCancelRepro)
342406
}
343407

344408
func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {

worker/worker.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string)
9797
return internal.ReplayWorkflowHistoryFromJSONFile(logger, jsonfileName)
9898
}
9999

100+
// ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the json history file upto provided
101+
//// lastEventID(inclusive), downloaded from the cli.
102+
// To download the history file: cadence workflow showid <workflow_id> -of <output_filename>
103+
// See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation
104+
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
105+
// The logger is an optional parameter. Defaults to the noop logger.
106+
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error {
107+
return internal.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
108+
}
109+
100110
// ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it.
101111
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
102112
// The logger is the only optional parameter. Defaults to the noop logger.

0 commit comments

Comments
 (0)